diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-10-17 16:39:20 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-17 16:39:20 +0000 |
commit | 262d5eae3c1a4a3fe85ee4c182c6ccc3316fd2f9 (patch) | |
tree | fc1d30ba7cdd2dcfebc5a7e24cc55cdabac8a0b2 | |
parent | 8986f4f9c37ef66d8de3c64dd3bc16692db9f326 (diff) | |
download | qpid-python-262d5eae3c1a4a3fe85ee4c182c6ccc3316fd2f9.tar.gz |
QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that have been taken.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585565 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index a1d31d2cbd..9564225190 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -883,7 +883,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - msg.taken(_queue, s); + + if (msg.taken(_queue, s)) + { + //Message has been delivered so don't redeliver. + // This can currently occur because of the recursive call below + // During unit tests the send can occur + // client then rejects + // this reject then releases the message by the time the + // if(!msg.isTaken()) call is made below + // the message has been released so that thread loops to send the message again + // of course by the time it gets back to here. the thread that released the + // message is now ready to send it. Here is a sample trace for reference +//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]} +//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657 +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1) +//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]} + // Note: In the last request to take the message from thread 4,5 the message has been + // taken by the previous call done by thread 2,5 + + + return; + } //Deliver the message s.send(msg, _queue); } @@ -897,6 +926,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + // + // Why do we do this? What was the reasoning? We should have a better approach + // than recursion and rejecting if someone else sends it before we do. + // if (!msg.isTaken(_queue)) { if (debugEnabled) |