diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-09 19:07:29 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-09 19:07:29 +0000 |
commit | 83a62e932ecf5bbe6be1017a602da468d51a5dfd (patch) | |
tree | 67d9ba0580965820c0cb3d5f38d0dbab3c4998d2 | |
parent | 671f4452b2207171fee99536971d8a075b60d7ad (diff) | |
download | qpid-python-83a62e932ecf5bbe6be1017a602da468d51a5dfd.tar.gz |
QPID-6437 : merged r1664714,1664731,1664839 from trunk to 0.32 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.32@1665326 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 53 insertions, 22 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index e5a82eb2a3..f9b3bc3c7b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -33,6 +33,7 @@ import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -148,6 +149,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private String _localHostname; private boolean _secure; private Principal _externalPrincipal; + private List<Runnable> _postLockActions = new ArrayList<>(); public ConnectionEndpoint(Container container, SaslServerProvider cbs) { @@ -792,22 +794,37 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour _logger = logger; } - public synchronized void receive(final short channel, final Object frame) + public void receive(final short channel, final Object frame) { - if (_logger.isEnabled()) + List<Runnable> postLockActions; + synchronized(this) { - _logger.received(_remoteAddress, channel, frame); - } - if (frame instanceof FrameBody) - { - ((FrameBody) frame).invoke(channel, this); + if (_logger.isEnabled()) + { + _logger.received(_remoteAddress, channel, frame); + } + if (frame instanceof FrameBody) + { + ((FrameBody) frame).invoke(channel, this); + } + else if (frame instanceof SaslFrameBody) + { + ((SaslFrameBody) frame).invoke(this); + } + postLockActions = _postLockActions; + _postLockActions = new ArrayList<>(); } - else if (frame instanceof SaslFrameBody) + for(Runnable action : postLockActions) { - ((SaslFrameBody) frame).invoke(this); + action.run(); } } + synchronized void addPostLockAction(Runnable action) + { + _postLockActions.add(action); + } + public AMQPDescribedTypeRegistry getDescribedTypeRegistry() { return _describedTypeRegistry; diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java index 5dba729b5b..b52e37a18d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java @@ -166,7 +166,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener> setLinkCredit(limit.subtract(getDeliveryCount())); } } - + getSession().getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + flowStateChanged(); + } + }); } @Override diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index d126687e94..af263d0f45 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -456,33 +456,40 @@ public class SessionEndpoint public void receiveFlow(final Flow flow) { - Collection<LinkEndpoint> endpoints = new ArrayList<>(); - synchronized(getLock()) + synchronized (getLock()) { UnsignedInteger handle = flow.getHandle(); - LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); + final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); + final UnsignedInteger nextOutgoingId = + flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - if(endpoint != null) + if (endpoint != null) { - endpoint.receiveFlow( flow ); - endpoints.add(endpoint); + endpoint.receiveFlow(flow); } else { - endpoints.addAll(_remoteLinkEndpoints.values()); + final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); + getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + + for(LinkEndpoint le : allLinkEndpoints) + { + le.flowStateChanged(); + } + } + }); } getLock().notifyAll(); } - for(LinkEndpoint le : endpoints) - { - le.flowStateChanged(); - } } |