summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-08-27 15:12:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-08-27 15:12:27 +0000
commitc0236702c065538b33d6a621795daaf458eb38a0 (patch)
tree4fefbf680ccd19ac19b2f742c247091891a48775
parentc9f30c4dc9fbdce7a944b1aca5342f420b564b78 (diff)
downloadqpid-python-c0236702c065538b33d6a621795daaf458eb38a0.tar.gz
state change fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@808451 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java7
4 files changed, 24 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
index 895db7b15b..cfe5aedd61 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
@@ -31,19 +31,28 @@ public abstract class AbstractFlowCreditManager implements FlowCreditManager
public final void addStateListener(FlowCreditManagerListener listener)
{
- _listeners.add(listener);
+ synchronized(_listeners)
+ {
+ _listeners.add(listener);
+ }
}
public final boolean removeListener(FlowCreditManagerListener listener)
{
- return _listeners.remove(listener);
+ synchronized(_listeners)
+ {
+ return _listeners.remove(listener);
+ }
}
private void notifyListeners(final boolean suspended)
{
- for(FlowCreditManagerListener listener : _listeners)
+ synchronized(_listeners)
{
- listener.creditStateChanged(!suspended);
+ for(FlowCreditManagerListener listener : _listeners)
+ {
+ listener.creditStateChanged(!suspended);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index b021fece57..40ee92efea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -37,6 +37,8 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
{
_bytesCredit = bytesCredit;
_messageCredit = messageCredit;
+ setSuspended(!hasCredit());
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index a3509ee02d..940b89dba9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -39,6 +39,8 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
{
_bytesCreditLimit = bytesCreditLimit;
_messageCreditLimit = messageCreditLimit;
+ setSuspended(!hasCredit());
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 3bf551dd03..999d268181 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -88,6 +88,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_creditManager = creditManager;
_filters = filters;
_creditManager.addStateListener(this);
+ _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
}
@@ -484,7 +485,13 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
default:
throw new RuntimeException("Unknown message flow mode: " + flowMode);
}
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+
_creditManager.addStateListener(this);
+
}
public boolean isStopped()