summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-07 12:40:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-07 12:40:26 +0000
commit5cdfd4cb313381431ab66376758b10c662f5b374 (patch)
tree8d0a584d6be7d5ab338a59e7aaefd4a9b261d8bf
parentf6f01711bbe15b8abbd9e7a0414234efd8831c50 (diff)
downloadqpid-python-5cdfd4cb313381431ab66376758b10c662f5b374.tar.gz
QPID-6437 : ensure session/link flow notifies occur
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664839 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java9
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java57
2 files changed, 32 insertions, 34 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
index 5dba729b5b..b52e37a18d 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
@@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
setLinkCredit(limit.subtract(getDeliveryCount()));
}
}
-
+ getSession().getConnection().addPostLockAction(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ flowStateChanged();
+ }
+ });
}
@Override
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 28b84681d1..af263d0f45 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -458,49 +458,40 @@ public class SessionEndpoint
{
synchronized (getLock())
{
- synchronized (getLock())
- {
- UnsignedInteger handle = flow.getHandle();
- final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
+ UnsignedInteger handle = flow.getHandle();
+ final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
- final UnsignedInteger nextOutgoingId =
- flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
- _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+ final UnsignedInteger nextOutgoingId =
+ flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+ int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+ _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
- if (endpoint != null)
- {
- getConnection().addPostLockAction(new Runnable()
- {
- @Override
- public void run()
- {
- endpoint.receiveFlow(flow);
- }
- });
- }
- else
+ if (endpoint != null)
+ {
+ endpoint.receiveFlow(flow);
+ }
+ else
+ {
+ final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+ getConnection().addPostLockAction(new Runnable()
{
- final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
- getConnection().addPostLockAction(new Runnable()
+ @Override
+ public void run()
{
- @Override
- public void run()
- {
- for(LinkEndpoint le : allLinkEndpoints)
- {
- le.flowStateChanged();
- }
+ for(LinkEndpoint le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
}
- });
- }
-
- getLock().notifyAll();
+ }
+ });
}
+
+ getLock().notifyAll();
}
+
}
public void receiveDisposition(final Disposition disposition)