summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-06 22:06:25 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-06 22:06:25 +0000
commitf6f01711bbe15b8abbd9e7a0414234efd8831c50 (patch)
tree02d52643dd46f92e49c09251b8a4e4a1829d8f9d
parentd2f82c0956f4e9dda9f7ff111b50c69436eb629b (diff)
downloadqpid-python-f6f01711bbe15b8abbd9e7a0414234efd8831c50.tar.gz
QPID-6347 : ensure link enpoints are notified outside of holding the connection lock
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664731 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java44
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java60
2 files changed, 66 insertions, 38 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index f4484858a2..f9b3bc3c7b 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -33,6 +33,7 @@ import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -148,6 +149,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private String _localHostname;
private boolean _secure;
private Principal _externalPrincipal;
+ private List<Runnable> _postLockActions = new ArrayList<>();
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
@@ -663,14 +665,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
}
}
- public void receiveFlow(short channel, Flow flow)
+ public synchronized void receiveFlow(short channel, Flow flow)
{
- SessionEndpoint endPoint;
- synchronized (this)
- {
- endPoint = getSession(channel);
- }
-
+ SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveFlow(flow);
@@ -797,22 +794,37 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
_logger = logger;
}
- public synchronized void receive(final short channel, final Object frame)
+ public void receive(final short channel, final Object frame)
{
- if (_logger.isEnabled())
+ List<Runnable> postLockActions;
+ synchronized(this)
{
- _logger.received(_remoteAddress, channel, frame);
- }
- if (frame instanceof FrameBody)
- {
- ((FrameBody) frame).invoke(channel, this);
+ if (_logger.isEnabled())
+ {
+ _logger.received(_remoteAddress, channel, frame);
+ }
+ if (frame instanceof FrameBody)
+ {
+ ((FrameBody) frame).invoke(channel, this);
+ }
+ else if (frame instanceof SaslFrameBody)
+ {
+ ((SaslFrameBody) frame).invoke(this);
+ }
+ postLockActions = _postLockActions;
+ _postLockActions = new ArrayList<>();
}
- else if (frame instanceof SaslFrameBody)
+ for(Runnable action : postLockActions)
{
- ((SaslFrameBody) frame).invoke(this);
+ action.run();
}
}
+ synchronized void addPostLockAction(Runnable action)
+ {
+ _postLockActions.add(action);
+ }
+
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
{
return _describedTypeRegistry;
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index d126687e94..28b84681d1 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -456,32 +456,48 @@ public class SessionEndpoint
public void receiveFlow(final Flow flow)
{
- Collection<LinkEndpoint> endpoints = new ArrayList<>();
- synchronized(getLock())
+ synchronized (getLock())
{
- UnsignedInteger handle = flow.getHandle();
- LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
-
- final UnsignedInteger nextOutgoingId = flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
- _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
-
- if(endpoint != null)
- {
- endpoint.receiveFlow( flow );
- endpoints.add(endpoint);
- }
- else
+ synchronized (getLock())
{
- endpoints.addAll(_remoteLinkEndpoints.values());
- }
+ UnsignedInteger handle = flow.getHandle();
+ final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
- getLock().notifyAll();
- }
+ final UnsignedInteger nextOutgoingId =
+ flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
+ int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
+ _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
- for(LinkEndpoint le : endpoints)
- {
- le.flowStateChanged();
+ if (endpoint != null)
+ {
+ getConnection().addPostLockAction(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ endpoint.receiveFlow(flow);
+ }
+ });
+ }
+ else
+ {
+ final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
+ getConnection().addPostLockAction(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ for(LinkEndpoint le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
+ }
+ }
+ });
+ }
+
+ getLock().notifyAll();
+ }
}