diff options
2 files changed, 32 insertions, 34 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java index 5dba729b5b..b52e37a18d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java @@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener> setLinkCredit(limit.subtract(getDeliveryCount())); } } - + getSession().getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + flowStateChanged(); + } + }); } @Override diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 28b84681d1..af263d0f45 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -458,49 +458,40 @@ public class SessionEndpoint { synchronized (getLock()) { - synchronized (getLock()) - { - UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); + UnsignedInteger handle = flow.getHandle(); + final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - final UnsignedInteger nextOutgoingId = - flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); - int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); - _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); + final UnsignedInteger nextOutgoingId = + flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); + int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); + _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - if (endpoint != null) - { - getConnection().addPostLockAction(new Runnable() - { - @Override - public void run() - { - endpoint.receiveFlow(flow); - } - }); - } - else + if (endpoint != null) + { + endpoint.receiveFlow(flow); + } + else + { + final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); + getConnection().addPostLockAction(new Runnable() { - final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); - getConnection().addPostLockAction(new Runnable() + @Override + public void run() { - @Override - public void run() - { - for(LinkEndpoint le : allLinkEndpoints) - { - le.flowStateChanged(); - } + for(LinkEndpoint le : allLinkEndpoints) + { + le.flowStateChanged(); } - }); - } - - getLock().notifyAll(); + } + }); } + + getLock().notifyAll(); } + } public void receiveDisposition(final Disposition disposition) |