summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java13
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java13
2 files changed, 16 insertions, 10 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
index fb525f0666..5dba729b5b 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
@@ -21,15 +21,19 @@
package org.apache.qpid.amqp_1_0.transport;
-import org.apache.qpid.amqp_1_0.type.*;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.transport.Attach;
import org.apache.qpid.amqp_1_0.type.transport.Flow;
import org.apache.qpid.amqp_1_0.type.transport.Role;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import java.util.HashMap;
-import java.util.Map;
-
public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
{
@@ -162,7 +166,6 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
setLinkCredit(limit.subtract(getDeliveryCount()));
}
}
- getLinkEventListener().flowStateChanged();
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 5a28ddcb60..d126687e94 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -456,7 +456,7 @@ public class SessionEndpoint
public void receiveFlow(final Flow flow)
{
-
+ Collection<LinkEndpoint> endpoints = new ArrayList<>();
synchronized(getLock())
{
UnsignedInteger handle = flow.getHandle();
@@ -469,18 +469,21 @@ public class SessionEndpoint
if(endpoint != null)
{
endpoint.receiveFlow( flow );
+ endpoints.add(endpoint);
}
else
{
- for(LinkEndpoint le : _remoteLinkEndpoints.values())
- {
- le.flowStateChanged();
- }
+ endpoints.addAll(_remoteLinkEndpoints.values());
}
getLock().notifyAll();
}
+ for(LinkEndpoint le : endpoints)
+ {
+ le.flowStateChanged();
+ }
+
}