diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-10-17 16:59:42 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-17 16:59:42 +0000 |
commit | ea120fa96d784bcca44c92329a3f1598841522fb (patch) | |
tree | 2e22d22887d22adbbc5baafaabd2b6578ae1ba2b | |
parent | 298e577a501ffcc12e9dd9f35d61a1aa4cbb8928 (diff) | |
download | qpid-python-ea120fa96d784bcca44c92329a3f1598841522fb.tar.gz |
QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg is queued that has the potential to be delivered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585575 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 9564225190..d872b50458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -212,6 +212,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** + * + * @return the state of the async processor. + */ + public boolean isProcessingAsync() + { + return _processing.get(); + } + + /** * Returns all the messages in the Queue * * @return List of messages @@ -821,6 +830,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { addMessageToQueue(msg, deliverFirst); + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! + if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) + { + _queue.deliverAsync(); + } + //release lock now message is on queue. _lock.unlock(); @@ -975,6 +990,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { public void run() { + String startName = Thread.currentThread().getName(); + Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName); boolean running = true; while (running && !_movingMessages.get()) { @@ -990,6 +1007,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processing.set(false); } } + Thread.currentThread().setName(startName); } } @@ -1016,8 +1034,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") + + "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + + ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + |