diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-11-07 09:09:02 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-11-07 09:09:02 +0000 |
commit | bd0ceeddf6758961d163a1e5352eec3735f5bb8f (patch) | |
tree | b1c5976f483da12ff13cb52198f7a292f1bb5805 | |
parent | 0faf1c819535eefa3482c32b39dd8611d10b73ef (diff) | |
download | qpid-python-bd0ceeddf6758961d163a1e5352eec3735f5bb8f.tar.gz |
QPID-5295 : Thread deadlock on creating Sender
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1539569 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 64 insertions, 28 deletions
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index e35248f58c..e891c5cbe7 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -142,31 +142,11 @@ public class Sender implements DeliveryStateHandler { _session = session; + _windowSize = window; session.getConnection().checkNotClosed(); configureSource(source); configureTarget(target); - _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, - source, target, unsettled); - - - switch(mode) - { - case ALO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); - break; - case EO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - _endpoint.setDeliveryStateHandler(this); - _endpoint.attach(); - _windowSize = window; + _endpoint = session.createSendingLinkEndpoint(linkName, target, source, mode, unsettled, this); synchronized(_endpoint.getLock()) { @@ -181,10 +161,14 @@ public class Sender implements DeliveryStateHandler throw new SenderCreationException(e); } } + if (session.getEndpoint().isEnded()) + { + throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress()); + } if(_endpoint.getTarget()== null) { throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); - }; + } } _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 1c12907c49..5b9a67503b 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionState; @@ -64,7 +65,7 @@ public class Session } - public synchronized Sender createSender(final String targetName) + public Sender createSender(final String targetName) throws Sender.SenderCreationException, ConnectionClosedException { @@ -74,7 +75,7 @@ public class Session } - public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator) + public Sender createSender(final String targetName, final SourceConfigurator configurator) throws Sender.SenderCreationException, ConnectionClosedException { @@ -90,7 +91,7 @@ public class Session } - public synchronized Sender createSender(final String targetName, int window) + public Sender createSender(final String targetName, int window) throws Sender.SenderCreationException, ConnectionClosedException { final String sourceName = UUID.randomUUID().toString(); @@ -113,6 +114,36 @@ public class Session } + public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, + final Target target, + final Source source, + AcknowledgeMode mode, + Map<Binary, Outcome> unsettled, + final DeliveryStateHandler deliveryStateHandler) + { + SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName, source, target, + unsettled, deliveryStateHandler); + + switch(mode) + { + case ALO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + link.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + + link.attach(); + + return link; + } public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException { diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index a89844afd4..32fffd545a 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -72,12 +72,17 @@ public abstract class LinkEndpoint<T extends LinkEventListener> LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled) { + this(sessionEndpoint, name, unsettled, null); + } + + LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler) + { _name = name; _session = sessionEndpoint; _linkCredit = UnsignedInteger.valueOf(0); _drain = Boolean.FALSE; _localUnsettled = unsettled; - + _deliveryStateHandler = deliveryStateHandler; } LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java index 16e198e957..fb525f0666 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java @@ -49,6 +49,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener> init(); } + + public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled, + DeliveryStateHandler deliveryStateHandler) + { + super(sessionEndpoint, name, unsettled, deliveryStateHandler); + init(); + } + public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach) { super(sessionEndpoint, attach); diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 7e72ce55a3..f82ca7ee57 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -507,7 +507,15 @@ public class SessionEndpoint public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled) { - SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled); + return createSendingLinkEndpoint(name, source, target, unsettled, null); + } + + public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, + final org.apache.qpid.amqp_1_0.type.Target target, + Map<Binary, Outcome> unsettled, + DeliveryStateHandler deliveryStateHandler) + { + SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled, deliveryStateHandler); endpoint.setSource(source); endpoint.setTarget(target); UnsignedInteger handle = findNextAvailableHandle(); |