diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-07 12:40:26 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-07 12:40:26 +0000 |
commit | 5cdfd4cb313381431ab66376758b10c662f5b374 (patch) | |
tree | 8d0a584d6be7d5ab338a59e7aaefd4a9b261d8bf | |
parent | f6f01711bbe15b8abbd9e7a0414234efd8831c50 (diff) | |
download | qpid-python-5cdfd4cb313381431ab66376758b10c662f5b374.tar.gz |
QPID-6437 : ensure session/link flow notifies occur
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664839 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 32 insertions, 34 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java index 5dba729b5b..b52e37a18d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java @@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener> setLinkCredit(limit.subtract(getDeliveryCount())); } } - + getSession().getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + flowStateChanged(); + } + }); } @Override diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 28b84681d1..af263d0f45 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -458,49 +458,40 @@ public class SessionEndpoint { synchronized (getLock()) { - synchronized (getLock()) - { - UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); + UnsignedInteger handle = flow.getHandle(); + final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - final UnsignedInteger nextOutgoingId = - flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); - int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); - _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); + final UnsignedInteger nextOutgoingId = + flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); + int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); + _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - if (endpoint != null) - { - getConnection().addPostLockAction(new Runnable() - { - @Override - public void run() - { - endpoint.receiveFlow(flow); - } - }); - } - else + if (endpoint != null) + { + endpoint.receiveFlow(flow); + } + else + { + final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); + getConnection().addPostLockAction(new Runnable() { - final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); - getConnection().addPostLockAction(new Runnable() + @Override + public void run() { - @Override - public void run() - { - for(LinkEndpoint le : allLinkEndpoints) - { - le.flowStateChanged(); - } + for(LinkEndpoint le : allLinkEndpoints) + { + le.flowStateChanged(); } - }); - } - - getLock().notifyAll(); + } + }); } + + getLock().notifyAll(); } + } public void receiveDisposition(final Disposition disposition) |