summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-11-07 09:09:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-11-07 09:09:02 +0000
commitbd0ceeddf6758961d163a1e5352eec3735f5bb8f (patch)
treeb1c5976f483da12ff13cb52198f7a292f1bb5805
parent0faf1c819535eefa3482c32b39dd8611d10b73ef (diff)
downloadqpid-python-bd0ceeddf6758961d163a1e5352eec3735f5bb8f.tar.gz
QPID-5295 : Thread deadlock on creating Sender
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1539569 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java30
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java37
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java7
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java8
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java10
5 files changed, 64 insertions, 28 deletions
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index e35248f58c..e891c5cbe7 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -142,31 +142,11 @@ public class Sender implements DeliveryStateHandler
{
_session = session;
+ _windowSize = window;
session.getConnection().checkNotClosed();
configureSource(source);
configureTarget(target);
- _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
- source, target, unsettled);
-
-
- switch(mode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.attach();
- _windowSize = window;
+ _endpoint = session.createSendingLinkEndpoint(linkName, target, source, mode, unsettled, this);
synchronized(_endpoint.getLock())
{
@@ -181,10 +161,14 @@ public class Sender implements DeliveryStateHandler
throw new SenderCreationException(e);
}
}
+ if (session.getEndpoint().isEnded())
+ {
+ throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress());
+ }
if(_endpoint.getTarget()== null)
{
throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- };
+ }
}
_endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index 1c12907c49..5b9a67503b 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionState;
@@ -64,7 +65,7 @@ public class Session
}
- public synchronized Sender createSender(final String targetName)
+ public Sender createSender(final String targetName)
throws Sender.SenderCreationException, ConnectionClosedException
{
@@ -74,7 +75,7 @@ public class Session
}
- public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator)
+ public Sender createSender(final String targetName, final SourceConfigurator configurator)
throws Sender.SenderCreationException, ConnectionClosedException
{
@@ -90,7 +91,7 @@ public class Session
}
- public synchronized Sender createSender(final String targetName, int window)
+ public Sender createSender(final String targetName, int window)
throws Sender.SenderCreationException, ConnectionClosedException
{
final String sourceName = UUID.randomUUID().toString();
@@ -113,6 +114,36 @@ public class Session
}
+ public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
+ final Target target,
+ final Source source,
+ AcknowledgeMode mode,
+ Map<Binary, Outcome> unsettled,
+ final DeliveryStateHandler deliveryStateHandler)
+ {
+ SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName, source, target,
+ unsettled, deliveryStateHandler);
+
+ switch(mode)
+ {
+ case ALO:
+ link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ link.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ break;
+ case EO:
+ link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+
+ link.attach();
+
+ return link;
+ }
public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
{
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index a89844afd4..32fffd545a 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -72,12 +72,17 @@ public abstract class LinkEndpoint<T extends LinkEventListener>
LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
{
+ this(sessionEndpoint, name, unsettled, null);
+ }
+
+ LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
+ {
_name = name;
_session = sessionEndpoint;
_linkCredit = UnsignedInteger.valueOf(0);
_drain = Boolean.FALSE;
_localUnsettled = unsettled;
-
+ _deliveryStateHandler = deliveryStateHandler;
}
LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach)
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
index 16e198e957..fb525f0666 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
@@ -49,6 +49,14 @@ public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
init();
}
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled,
+ DeliveryStateHandler deliveryStateHandler)
+ {
+ super(sessionEndpoint, name, unsettled, deliveryStateHandler);
+ init();
+ }
+
public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
{
super(sessionEndpoint, attach);
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 7e72ce55a3..f82ca7ee57 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -507,7 +507,15 @@ public class SessionEndpoint
public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source, final org.apache.qpid.amqp_1_0.type.Target target, Map<Binary, Outcome> unsettled)
{
- SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled);
+ return createSendingLinkEndpoint(name, source, target, unsettled, null);
+ }
+
+ public SendingLinkEndpoint createSendingLinkEndpoint(final String name, final Source source,
+ final org.apache.qpid.amqp_1_0.type.Target target,
+ Map<Binary, Outcome> unsettled,
+ DeliveryStateHandler deliveryStateHandler)
+ {
+ SendingLinkEndpoint endpoint = new SendingLinkEndpoint(this, name, unsettled, deliveryStateHandler);
endpoint.setSource(source);
endpoint.setTarget(target);
UnsignedInteger handle = findNextAvailableHandle();