summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-17 16:59:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-17 16:59:42 +0000
commitea120fa96d784bcca44c92329a3f1598841522fb (patch)
tree2e22d22887d22adbbc5baafaabd2b6578ae1ba2b
parent298e577a501ffcc12e9dd9f35d61a1aa4cbb8928 (diff)
downloadqpid-python-ea120fa96d784bcca44c92329a3f1598841522fb.tar.gz
QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg is queued that has the potential to be delivered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585575 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java23
1 files changed, 21 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 9564225190..d872b50458 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -212,6 +212,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
+ *
+ * @return the state of the async processor.
+ */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
@@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
addMessageToQueue(msg, deliverFirst);
+ //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong!
+ if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
//release lock now message is on queue.
_lock.unlock();
@@ -975,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -990,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processing.set(false);
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -1016,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
+ ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +