summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-13 11:46:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-13 11:46:37 +0000
commitc257715e5f77e6e3bbddb4ccced0645ac0199698 (patch)
tree252d4165d626e4c890d73db22dd6122c4d09a5db
parent01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 (diff)
downloadqpid-python-c257715e5f77e6e3bbddb4ccced0645ac0199698.tar.gz
QPID-346 Message loss after rollback/recover
With Multiple consumers closing and requeuing occasionally a message would be lost given the use of msg.getDeliveredToConsumer() as this will be set on the first send an so on the infrequent occasion that a subscriber closes whilst a message is being delivered then that message would be lost. AMQChannel - Fixed bug where messages would not be requeued on consumer closure. Increased quantity of logging. AMQMessage - Added method to get 'taken' status. AMQQueue - Wrapped all log messages with correct is<X>Enabled() also removed debug() method as this makes debugging very difficult (log4j will always report the same log line, requiring searching of the file to fine the actual log line.) ConcurrentSelectorDeliveryManager - Increased and enclosed logging (isXEnabled). Wrapped the send calls with a lock on the Subscription(Impl) such that a send will not occur if the Subscription(Impl) has been closed. SubscriptionImpl - Used sendLock to set SI closed. This is used to mark subscription as suspended so no messages will be sent to it. Increased and wrapped logging. SubscriptionSet - Added locking around the insertion and removal of entries to _subscription. As we need to retrieve the actual Subscription from the map when removing rather than the dummy object created for lookup. This requires two call to _subcription which is there for not thread safe. log4j - updated to have handy debug defaults that simply need uncommented. Test to follow, once cleaned up. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506979 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/etc/log4j.xml19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java69
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java32
7 files changed, 200 insertions, 63 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 810b65798e..7230a55dbd 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -47,14 +47,25 @@
</category>
<category name="org.apache.qpid.framing.AMQDataBlockEncoder">
- <priority value="info"/>
- </category>
+ <priority value="info"/>
+ </category>
+ <!--category name="org.apache.qpid.server.queue.SubscriptionImpl">
+ <priority value="trace"/>
+ </category>
- <category name="org.apache.qpid">
+ <category name="org.apache.qpid.server.queue.ConcurrentSelectorDeliveryManager">
+ <priority value="trace"/>
+ </category>
+
+ <category name="org.apache.qpid.server.AMQChannel">
+ <priority value="trace"/>
+ </category -->
+
+ <category name="org.apache.qpid">
<priority value="warn"/>
</category>
-
+
<root>
<priority value="info"/>
<appender-ref ref="STDOUT"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index e1b6497062..4b029f88c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -310,13 +310,15 @@ public class AMQChannel
{
if (_log.isTraceEnabled())
{
- _log.trace("Unsubscribed consumer:" + consumerTag);
+ _log.trace("Unsubscribed consumer:" + consumerTag + "on Session " + session +
+ " Unacked Map Size:" + _unacknowledgedMessageMap.size());
}
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
q.unregisterProtocolSession(session, _channelId, consumerTag);
}
+ requeue();
}
/**
@@ -358,15 +360,18 @@ public class AMQChannel
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- if (_log.isTraceEnabled())
- {
- _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId +
- " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag +
- " from queue:" + queue.getName());
- }
-
synchronized (_unacknowledgedMessageMapLock)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId +
+ "(" + System.identityHashCode(this) + ")" +
+ " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag +
+ " from queue:" + queue.getName() +
+ " unackedSize[" + System.identityHashCode(_unacknowledgedMessageMap) + "](pre-put):"
+ + _unacknowledgedMessageMap.size() + ":" + _unacknowledgedMessageMap.toString());
+ }
+
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
_lastDeliveryTag = deliveryTag;
checkSuspension();
@@ -405,6 +410,11 @@ public class AMQChannel
unacked.queue.deliver(unacked.message);
}
}
+
+ if (_unacknowledgedMessageMap.size() != 0)
+ {
+ _log.error("unack map is not empty after resend was item added to unack map whilst consumer is closing");
+ }
}
/** Called to resend all outstanding unacknowledged messages to this same channel. */
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 8ba3cc5686..af7d7ea493 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -471,4 +471,9 @@ public class AMQMessage
{
return _takenBySubcription;
}
+
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 99bf9ca31d..e6882906ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -341,7 +341,12 @@ public class AMQQueue implements Managable, Comparable
public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
- debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} [{4}] and " +
+ "consumer tag {2} with {3}",
+ ps, channel, consumerTag, this, System.identityHashCode(channel)));
+ }
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
@@ -358,8 +363,13 @@ public class AMQQueue implements Managable, Comparable
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
{
- debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
- this);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} [{4}] " +
+ "and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this, System.identityHashCode(channel)));
+ }
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -371,6 +381,12 @@ public class AMQQueue implements Managable, Comparable
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Removed consumer tag {0} with channel {1} [{3}] from {2}",
+ consumerTag, channel, this, System.identityHashCode(channel)));
+ }
+
removedSubscription.close();
// if we are eligible for auto deletion, unregister from the queue registry
@@ -412,7 +428,10 @@ public class AMQQueue implements Managable, Comparable
protected void autodelete() throws AMQException
{
- debug("autodeleting {0}", this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("autodeleting {0}", this));
+ }
delete();
}
@@ -516,14 +535,6 @@ public class AMQQueue implements Managable, Comparable
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- private void debug(String msg, Object... args)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(msg, args));
- }
- }
-
public long getMinimumAlertRepeatGap()
{
return _minimumAlertRepeatGap;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 9b79657575..cf7d5bbc68 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -254,11 +254,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this));
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, queue);
@@ -333,17 +339,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(id() + "deliver :" + System.identityHashCode(msg));
+ _log.debug(id() + " Deliver :" + System.identityHashCode(msg) + ")");
}
//Check if we have someone to deliver the message to.
_lock.lock();
try
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " Getting next Subscriber for message :" + System.identityHashCode(msg) + ")");
+ }
+
Subscription s = _subscriptions.nextSubscriber(msg);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " Subscriber (" + System.identityHashCode(s) + ")" +
+ " selected for message :" + System.identityHashCode(msg) + ")");
+ }
+
if (s == null) //no-one can take the message right now.
{
if (_log.isDebugEnabled())
@@ -392,18 +409,58 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " About to take sendLock for subscriber :" + System.identityHashCode(s) +
+ " to deliver message:" + System.identityHashCode(msg));
+ }
+
//release lock now
_lock.unlock();
- if (_log.isDebugEnabled())
+ synchronized (s.sendlock())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+
+ //Mark message as taken
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ }
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
+ }
+
+ deliver(name, msg);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
+ }
}
- //Mark message as taken
- msg.taken(s);
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a53e305e49..e6e3a9cadb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -69,7 +69,7 @@ public class SubscriptionImpl implements Subscription
private boolean _closed = false;
private AMQQueue _queue;
- private final AtomicBoolean _resending = new AtomicBoolean(false);
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
public static class Factory implements SubscriptionFactory
{
@@ -193,7 +193,18 @@ public class SubscriptionImpl implements Subscription
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
@@ -239,7 +250,7 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
//fixme what is wrong with this?
@@ -275,7 +286,7 @@ public class SubscriptionImpl implements Subscription
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
//fixme what is wrong with this?
@@ -292,7 +303,18 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended() && !_resending.get();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -386,7 +408,20 @@ public class SubscriptionImpl implements Subscription
public void close()
{
- _logger.info("Closing subscription:" + this);
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ }
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -411,17 +446,17 @@ public class SubscriptionImpl implements Subscription
));
_closed = true;
}
+
}
private void requeue()
{
-
if (_queue != null)
{
- _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
-
- //Take control over to this thread for delivering messages from the Async Delivery.
- setResending(true);
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
+ }
while (!_resendQueue.isEmpty())
{
@@ -441,8 +476,6 @@ public class SubscriptionImpl implements Subscription
}
}
- setResending(false);
-
if (!_resendQueue.isEmpty())
{
_logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
@@ -462,14 +495,6 @@ public class SubscriptionImpl implements Subscription
_resendQueue = null;
}
- private void setResending(boolean resending)
- {
- synchronized (_resending)
- {
- _resending.set(resending);
- }
- }
-
public boolean isBrowser()
{
return _isBrowser;
@@ -528,7 +553,7 @@ public class SubscriptionImpl implements Subscription
public Object sendlock()
{
- return _resending;
+ return _sendLock;
}
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index c4dab50ff4..83570953c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -32,11 +32,12 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /** List of registered subscribers */
+ /** List of registered subscribers all edits must be done whilst holidng _subscriptionsChange */
private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
+ private final Object _subscriptionsChange = new Object();
/** Accessor for unit tests. */
int getCurrentSubscriber()
@@ -46,7 +47,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- _subscriptions.add(subscription);
+ synchronized (_subscriptionsChange)
+ {
+ _subscriptions.add(subscription);
+ }
}
/**
@@ -59,13 +63,27 @@ class SubscriptionSet implements WeightedSubscriptionManager
public Subscription removeSubscriber(Subscription subscription)
{
// TODO: possibly need O(1) operation here.
- int subIndex = _subscriptions.indexOf(subscription);
- if (subIndex != -1)
+ Subscription sub = null;
+ synchronized (_subscriptionsChange)
+ {
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
+ {
+ //we can't just return the passed in subscription as it is a new object
+ // and doesn't contain the stored state we need.
+ //NOTE while this may be removed now anyone with an iterator will still have it in the list!!
+ sub = _subscriptions.remove(subIndex);
+ }
+ else
+ {
+ _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription);
+ }
+ }
+ if (sub != null)
{
- //we can't just return the passed in subscription as it is a new object
- // and doesn't contain the stored state we need.
- return _subscriptions.remove(subIndex);
+ return sub;
}
else
{