diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-08-27 15:12:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-08-27 15:12:27 +0000 |
commit | c0236702c065538b33d6a621795daaf458eb38a0 (patch) | |
tree | 4fefbf680ccd19ac19b2f742c247091891a48775 | |
parent | c9f30c4dc9fbdce7a944b1aca5342f420b564b78 (diff) | |
download | qpid-python-c0236702c065538b33d6a621795daaf458eb38a0.tar.gz |
state change fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@808451 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 24 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java index 895db7b15b..cfe5aedd61 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java @@ -31,19 +31,28 @@ public abstract class AbstractFlowCreditManager implements FlowCreditManager public final void addStateListener(FlowCreditManagerListener listener) { - _listeners.add(listener); + synchronized(_listeners) + { + _listeners.add(listener); + } } public final boolean removeListener(FlowCreditManagerListener listener) { - return _listeners.remove(listener); + synchronized(_listeners) + { + return _listeners.remove(listener); + } } private void notifyListeners(final boolean suspended) { - for(FlowCreditManagerListener listener : _listeners) + synchronized(_listeners) { - listener.creditStateChanged(!suspended); + for(FlowCreditManagerListener listener : _listeners) + { + listener.creditStateChanged(!suspended); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java index b021fece57..40ee92efea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -37,6 +37,8 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl { _bytesCredit = bytesCredit; _messageCredit = messageCredit; + setSuspended(!hasCredit()); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index a3509ee02d..940b89dba9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -39,6 +39,8 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl { _bytesCreditLimit = bytesCreditLimit; _messageCreditLimit = messageCreditLimit; + setSuspended(!hasCredit()); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 3bf551dd03..999d268181 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -88,6 +88,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _creditManager = creditManager; _filters = filters; _creditManager.addStateListener(this); + _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); } @@ -484,7 +485,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr default: throw new RuntimeException("Unknown message flow mode: " + flowMode); } + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + _creditManager.addStateListener(this); + } public boolean isStopped() |