From c7a1a46260b8cdec96a4d8de9c54fb47f6300837 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 17 Oct 2007 19:59:58 +0000 Subject: Merged revisions 573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585653 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r585565 | ritchiem | 2007-10-17 17:39:20 +0100 (Wed, 17 Oct 2007) | 1 line QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that have been taken. ........ r585570 | ritchiem | 2007-10-17 17:48:01 +0100 (Wed, 17 Oct 2007) | 1 line Update to AMQMessage to reset the deliveredToConsumer flag(false) when the message is released. This flag is now used by more than the immediate delivery. It is also used to understand if the message has been delivered so that we can tell the message should not be purged. ........ r585575 | ritchiem | 2007-10-17 17:59:42 +0100 (Wed, 17 Oct 2007) | 1 line QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg is queued that has the potential to be delivered. ........ r585642 | rgodfrey | 2007-10-17 20:42:14 +0100 (Wed, 17 Oct 2007) | 1 line QPID-645 : TxnBuffer should rethrow exceptions encountered on commit ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@585655 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQMessage.java | 8 ++- .../queue/ConcurrentSelectorDeliveryManager.java | 58 ++++++++++++++++++++-- .../java/org/apache/qpid/server/txn/TxnBuffer.java | 28 ++++++++--- .../org/apache/qpid/server/txn/TxnBufferTest.java | 12 ++++- 4 files changed, 92 insertions(+), 14 deletions(-) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 01d0d870d7..dd9f32a306 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -133,7 +133,7 @@ public class AMQMessage public boolean isReferenced() { return _referenceCount.get() > 0; - } + } /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory @@ -558,6 +558,7 @@ public class AMQMessage taken.set(false); } + _deliveredToConsumer = false; _takenMap.put(queue, taken); _takenBySubcriptionMap.put(queue, null); } @@ -694,7 +695,10 @@ public class AMQMessage return false; } - /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index a1d31d2cbd..d872b50458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -211,6 +211,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * + * @return the state of the async processor. + */ + public boolean isProcessingAsync() + { + return _processing.get(); + } + /** * Returns all the messages in the Queue * @@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { addMessageToQueue(msg, deliverFirst); + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! + if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) + { + _queue.deliverAsync(); + } + //release lock now message is on queue. _lock.unlock(); @@ -883,7 +898,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - msg.taken(_queue, s); + + if (msg.taken(_queue, s)) + { + //Message has been delivered so don't redeliver. + // This can currently occur because of the recursive call below + // During unit tests the send can occur + // client then rejects + // this reject then releases the message by the time the + // if(!msg.isTaken()) call is made below + // the message has been released so that thread loops to send the message again + // of course by the time it gets back to here. the thread that released the + // message is now ready to send it. Here is a sample trace for reference +//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false] +//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1) +//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]} +//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657 +//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false] +//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null} +//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1) +//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]} + // Note: In the last request to take the message from thread 4,5 the message has been + // taken by the previous call done by thread 2,5 + + + return; + } //Deliver the message s.send(msg, _queue); } @@ -897,6 +941,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + // + // Why do we do this? What was the reasoning? We should have a better approach + // than recursion and rejecting if someone else sends it before we do. + // if (!msg.isTaken(_queue)) { if (debugEnabled) @@ -942,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { public void run() { + String startName = Thread.currentThread().getName(); + Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName); boolean running = true; while (running && !_movingMessages.get()) { @@ -957,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processing.set(false); } } + Thread.currentThread().setName(startName); } } @@ -983,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + + "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + + ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index 405c233552..46a68b6a23 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -54,7 +54,7 @@ public class TxnBuffer _ops.clear(); } - private boolean prepare(StoreContext context) + private boolean prepare(StoreContext context) throws AMQException { for (int i = 0; i < _ops.size(); i++) { @@ -63,19 +63,31 @@ public class TxnBuffer { op.prepare(context); } - catch (Exception e) + catch (AMQException e) { - //compensate previously prepared ops - for (int j = 0; j < i; j++) - { - _ops.get(j).undoPrepare(); - } - return false; + undoPrepare(i); + throw e; + } + catch (RuntimeException e) + { + undoPrepare(i); + throw e; } } return true; } + private void undoPrepare(int lastPrepared) + { + //compensate previously prepared ops + for (int j = 0; j < lastPrepared; j++) + { + _ops.get(j).undoPrepare(); + } + } + + + public void rollback(StoreContext context) throws AMQException { for (TxnOp op : _ops) diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java index 1d9e30c24e..025f8d89f6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; +import java.util.NoSuchElementException; public class TxnBufferTest extends TestCase { @@ -78,7 +79,16 @@ public class TxnBufferTest extends TestCase buffer.enlist(new FailedPrepare()); buffer.enlist(new MockOp()); - buffer.commit(null); + try + { + buffer.commit(null); + + } + catch (NoSuchElementException e) + { + + } + validateOps(); store.validate(); } -- cgit v1.2.1