diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-06 22:06:25 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-06 22:06:25 +0000 |
commit | f6f01711bbe15b8abbd9e7a0414234efd8831c50 (patch) | |
tree | 02d52643dd46f92e49c09251b8a4e4a1829d8f9d | |
parent | d2f82c0956f4e9dda9f7ff111b50c69436eb629b (diff) | |
download | qpid-python-f6f01711bbe15b8abbd9e7a0414234efd8831c50.tar.gz |
QPID-6347 : ensure link enpoints are notified outside of holding the connection lock
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664731 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 66 insertions, 38 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index f4484858a2..f9b3bc3c7b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -33,6 +33,7 @@ import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -148,6 +149,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private String _localHostname; private boolean _secure; private Principal _externalPrincipal; + private List<Runnable> _postLockActions = new ArrayList<>(); public ConnectionEndpoint(Container container, SaslServerProvider cbs) { @@ -663,14 +665,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } } - public void receiveFlow(short channel, Flow flow) + public synchronized void receiveFlow(short channel, Flow flow) { - SessionEndpoint endPoint; - synchronized (this) - { - endPoint = getSession(channel); - } - + SessionEndpoint endPoint = getSession(channel); if (endPoint != null) { endPoint.receiveFlow(flow); @@ -797,22 +794,37 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour _logger = logger; } - public synchronized void receive(final short channel, final Object frame) + public void receive(final short channel, final Object frame) { - if (_logger.isEnabled()) + List<Runnable> postLockActions; + synchronized(this) { - _logger.received(_remoteAddress, channel, frame); - } - if (frame instanceof FrameBody) - { - ((FrameBody) frame).invoke(channel, this); + if (_logger.isEnabled()) + { + _logger.received(_remoteAddress, channel, frame); + } + if (frame instanceof FrameBody) + { + ((FrameBody) frame).invoke(channel, this); + } + else if (frame instanceof SaslFrameBody) + { + ((SaslFrameBody) frame).invoke(this); + } + postLockActions = _postLockActions; + _postLockActions = new ArrayList<>(); } - else if (frame instanceof SaslFrameBody) + for(Runnable action : postLockActions) { - ((SaslFrameBody) frame).invoke(this); + action.run(); } } + synchronized void addPostLockAction(Runnable action) + { + _postLockActions.add(action); + } + public AMQPDescribedTypeRegistry getDescribedTypeRegistry() { return _describedTypeRegistry; diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index d126687e94..28b84681d1 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -456,32 +456,48 @@ public class SessionEndpoint public void receiveFlow(final Flow flow) { - Collection<LinkEndpoint> endpoints = new ArrayList<>(); - synchronized(getLock()) + synchronized (getLock()) { - UnsignedInteger handle = flow.getHandle(); - LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - - final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); - int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); - _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - - if(endpoint != null) - { - endpoint.receiveFlow( flow ); - endpoints.add(endpoint); - } - else + synchronized (getLock()) { - endpoints.addAll(_remoteLinkEndpoints.values()); - } + UnsignedInteger handle = flow.getHandle(); + final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle); - getLock().notifyAll(); - } + final UnsignedInteger nextOutgoingId = + flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId(); + int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue()); + _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue()); - for(LinkEndpoint le : endpoints) - { - le.flowStateChanged(); + if (endpoint != null) + { + getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + endpoint.receiveFlow(flow); + } + }); + } + else + { + final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values(); + getConnection().addPostLockAction(new Runnable() + { + @Override + public void run() + { + + for(LinkEndpoint le : allLinkEndpoints) + { + le.flowStateChanged(); + } + } + }); + } + + getLock().notifyAll(); + } } |