diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-10-18 09:09:38 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-18 09:09:38 +0000 |
commit | 5e6cb3bf5178acf5ab84e9b2478007d0a86c6459 (patch) | |
tree | 2cff38ed8b55d2d0fdb907fbe1630bcde32cf0a3 | |
parent | da48691cb4f28fc5b1f4ce4eb1d557c1f752ad44 (diff) | |
download | qpid-python-5e6cb3bf5178acf5ab84e9b2478007d0a86c6459.tar.gz |
QPID-647 : Async Process start/stop is not regulated tightly enough. Added additional synchronisation to ensure that a new subscriber can start the async if required. As currently the start request can be missed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@585906 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index d872b50458..71e50b178c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -211,10 +211,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - /** - * - * @return the state of the async processor. - */ + /** @return the state of the async processor. */ public boolean isProcessingAsync() { return _processing.get(); @@ -830,14 +827,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { addMessageToQueue(msg, deliverFirst); + //release lock now message is on queue. + _lock.unlock(); + //if we have a non-filtering subscriber but queued messages && we're not Async && we have other Active subs then something is wrong! if ((s != null && hasQueuedMessages()) && !isProcessingAsync() && _subscriptions.hasActiveSubscribers()) { _queue.deliverAsync(); } - //release lock now message is on queue. - _lock.unlock(); //Pre Deliver to all subscriptions if (debugEnabled) @@ -984,7 +982,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return id; } - Runner asyncDelivery = new Runner(); + final Runner _asyncDelivery = new Runner(); private class Runner implements Runnable { @@ -1000,11 +998,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Check that messages have not been added since we did our last peek(); // Synchronize with the thread that adds to the queue. // If the queue is still empty then we can exit - - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + synchronized (_asyncDelivery) { - running = false; - _processing.set(false); + if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) + { + running = false; + _processing.set(false); + } } } Thread.currentThread().setName(startName); @@ -1018,16 +1018,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.debug(debugIdentity() + "Processing Async." + currentStatus()); } - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) + synchronized (_asyncDelivery) { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) + if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) { - if (_log.isDebugEnabled()) + //are we already running? if so, don't re-run + if (_processing.compareAndSet(false, true)) { - _log.debug(debugIdentity() + "Executing Async process."); + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Executing Async process."); + } + executor.execute(_asyncDelivery); } - executor.execute(asyncDelivery); } } } |