summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-10-17 19:59:58 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-10-17 19:59:58 +0000
commitc7a1a46260b8cdec96a4d8de9c54fb47f6300837 (patch)
tree4d56262981014358e65550e234e97136f0f798b9
parenta88cefc2e0df7b7a1d9119296d168508d602ee61 (diff)
downloadqpid-python-c7a1a46260b8cdec96a4d8de9c54fb47f6300837.tar.gz
Merged revisions 573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585653 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r585565 | ritchiem | 2007-10-17 17:39:20 +0100 (Wed, 17 Oct 2007) | 1 line QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that have been taken. ........ r585570 | ritchiem | 2007-10-17 17:48:01 +0100 (Wed, 17 Oct 2007) | 1 line Update to AMQMessage to reset the deliveredToConsumer flag(false) when the message is released. This flag is now used by more than the immediate delivery. It is also used to understand if the message has been delivered so that we can tell the message should not be purged. ........ r585575 | ritchiem | 2007-10-17 17:59:42 +0100 (Wed, 17 Oct 2007) | 1 line QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg is queued that has the potential to be delivered. ........ r585642 | rgodfrey | 2007-10-17 20:42:14 +0100 (Wed, 17 Oct 2007) | 1 line QPID-645 : TxnBuffer should rethrow exceptions encountered on commit ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@585655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java28
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java12
4 files changed, 92 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 01d0d870d7..dd9f32a306 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -133,7 +133,7 @@ public class AMQMessage
public boolean isReferenced()
{
return _referenceCount.get() > 0;
- }
+ }
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -558,6 +558,7 @@ public class AMQMessage
taken.set(false);
}
+ _deliveredToConsumer = false;
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, null);
}
@@ -694,7 +695,10 @@ public class AMQMessage
return false;
}
- /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index a1d31d2cbd..d872b50458 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -212,6 +212,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
+ *
+ * @return the state of the async processor.
+ */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
@@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
addMessageToQueue(msg, deliverFirst);
+ //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong!
+ if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
//release lock now message is on queue.
_lock.unlock();
@@ -883,7 +898,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(_queue, s);
+
+ if (msg.taken(_queue, s))
+ {
+ //Message has been delivered so don't redeliver.
+ // This can currently occur because of the recursive call below
+ // During unit tests the send can occur
+ // client then rejects
+ // this reject then releases the message by the time the
+ // if(!msg.isTaken()) call is made below
+ // the message has been released so that thread loops to send the message again
+ // of course by the time it gets back to here. the thread that released the
+ // message is now ready to send it. Here is a sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]}
+ // Note: In the last request to take the message from thread 4,5 the message has been
+ // taken by the previous call done by thread 2,5
+
+
+ return;
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -897,6 +941,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ //
+ // Why do we do this? What was the reasoning? We should have a better approach
+ // than recursion and rejecting if someone else sends it before we do.
+ //
if (!msg.isTaken(_queue))
{
if (debugEnabled)
@@ -942,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -957,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processing.set(false);
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -983,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
+ ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index 405c233552..46a68b6a23 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -54,7 +54,7 @@ public class TxnBuffer
_ops.clear();
}
- private boolean prepare(StoreContext context)
+ private boolean prepare(StoreContext context) throws AMQException
{
for (int i = 0; i < _ops.size(); i++)
{
@@ -63,19 +63,31 @@ public class TxnBuffer
{
op.prepare(context);
}
- catch (Exception e)
+ catch (AMQException e)
{
- //compensate previously prepared ops
- for (int j = 0; j < i; j++)
- {
- _ops.get(j).undoPrepare();
- }
- return false;
+ undoPrepare(i);
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ undoPrepare(i);
+ throw e;
}
}
return true;
}
+ private void undoPrepare(int lastPrepared)
+ {
+ //compensate previously prepared ops
+ for (int j = 0; j < lastPrepared; j++)
+ {
+ _ops.get(j).undoPrepare();
+ }
+ }
+
+
+
public void rollback(StoreContext context) throws AMQException
{
for (TxnOp op : _ops)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 1d9e30c24e..025f8d89f6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
public class TxnBufferTest extends TestCase
{
@@ -78,7 +79,16 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new FailedPrepare());
buffer.enlist(new MockOp());
- buffer.commit(null);
+ try
+ {
+ buffer.commit(null);
+
+ }
+ catch (NoSuchElementException e)
+ {
+
+ }
+
validateOps();
store.validate();
}