summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-14 08:21:37 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-14 08:21:37 +0000
commitc5b44d59395ba7ebe8c84ce6461c4e39a0e5b99a (patch)
treeecd23c5206e43d23936b62da6a790fc66d0c69c1
parentb4b42b55d78674660d31609c40022988ddb8e318 (diff)
downloadqpid-python-c5b44d59395ba7ebe8c84ce6461c4e39a0e5b99a.tar.gz
QPID-346 Message loss after rollback/recover
Messages were still occasionally being sent twice. AMQChannel - added trace level logging that will show an error if the same message is attempted to be sent to the same client. AMQMessage - Remove logic that says the same subscriber can take always 'take' the message. SubscriptionImpl - Release message when it is put back on to the resendQueue this will allow it to be re-'taken' AMQSession - Added method to Dispatcher to clean up incomming _queue to try and prevent messages arriving for closed consumers. BasicMessageConsumer - added comments git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@507433 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
5 files changed, 87 insertions, 6 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index e056af55cf..90ab71f703 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -372,6 +372,20 @@ public class AMQChannel
+ _unacknowledgedMessageMap.size() + ":" + _unacknowledgedMessageMap.toString());
}
+ //Debug adding messages to this map.
+ if (_log.isTraceEnabled())
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+ {
+ if (entry.getValue().message == message)
+ {
+ // this is set at error level but only output it if we are tracing.
+ _log.error("Adding message (" + System.identityHashCode(message) +
+ ") that is already in unacked map entryTag:"
+ + entry.getKey() + " dT:" + deliveryTag);
+ }
+ }
+ }
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
_lastDeliveryTag = deliveryTag;
checkSuspension();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index af7d7ea493..e6f0cc282b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -448,10 +448,6 @@ public class AMQMessage
{
if (_taken.getAndSet(true))
{
- if (sub == _takenBySubcription)
- {
- return false;
- }
return true;
}
else
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e6e3a9cadb..230430ab12 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -537,6 +537,10 @@ public class SubscriptionImpl implements Subscription
public void addToResendQueue(AMQMessage msg)
{
+ //fixme - will this be ok as we need to ensure redelivery to same subscriber first
+ //release the message so it can be redelivered
+ msg.release();
+
// add to our resend queue
getResendQueue().add(msg);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e475270ecd..783678f67c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -47,6 +47,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -289,6 +290,61 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+
+ /**
+ * The dispatcher should be stopped when calling this.
+ *
+ * @param consumerTag
+ */
+ public void removePending(String consumerTag)
+ {
+
+ synchronized (_lock)
+ {
+ boolean stopped = connectionStopped();
+
+ _dispatcher.setConnectionStopped(false);
+
+ LinkedList<UnprocessedMessage> tmpList = new LinkedList<UnprocessedMessage>();
+
+ while (_queue.size() != 0)
+ {
+ UnprocessedMessage message = null;
+ try
+ {
+ message = (UnprocessedMessage) _queue.take();
+
+ if (!message.deliverBody.consumerTag.equals(consumerTag))
+ {
+ tmpList.add(message);
+ }
+ else
+ {
+ _logger.error("Pruned pending message for consumer:" + consumerTag);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Interrupted whilst taking message");
+ }
+ }
+
+ if (!tmpList.isEmpty())
+ {
+ _logger.error("Tmp list is not empty");
+ }
+
+ for (UnprocessedMessage msg : tmpList)
+ {
+ _queue.add(msg);
+ }
+
+ if (stopped)
+ {
+ _dispatcher.setConnectionStopped(stopped);
+ }
+ }
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -599,8 +655,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
try
{
@@ -618,6 +672,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
+
}
catch (AMQException e)
{
@@ -1784,7 +1841,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
+ //need to clear pending messages from session _queue that the dispatcher will handle
+ // or we will get
+ // _dispatcher.removePending(consumer.getConsumerTag());
+
_consumers.remove(consumer.getConsumerTag());
+
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 058afab605..1607326e47 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -481,8 +481,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+
+ //this will remove consumer from _consumers map
deregisterConsumer();
+
+ // clears unacks from this consumer
_unacknowledgedDeliveryTags.clear();
+
if (_messageListener != null && _receiving.get())
{
_logger.info("Interrupting thread: " + _receivingThread);