summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java')
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java78
1 files changed, 54 insertions, 24 deletions
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index 5e1e1b1d7c..182d904a9c 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -60,12 +60,14 @@ public class Session
}
- public synchronized Sender createSender(final String targetName) throws Sender.SenderCreationException
+ public synchronized Sender createSender(final String targetName)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
return createSender(targetName, false);
}
- public synchronized Sender createSender(final String targetName, boolean synchronous) throws Sender.SenderCreationException
+ public synchronized Sender createSender(final String targetName, boolean synchronous)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
final String sourceName = UUID.randomUUID().toString();
@@ -73,30 +75,35 @@ public class Session
}
- public synchronized Sender createSender(final String targetName, int window) throws Sender.SenderCreationException
+ public synchronized Sender createSender(final String targetName, int window)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
final String sourceName = UUID.randomUUID().toString();
return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
}
- public Sender createSender(String targetName, int window, AcknowledgeMode mode) throws Sender.SenderCreationException
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
return createSender(targetName, window, mode, null);
}
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) throws Sender.SenderCreationException
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
return createSender(targetName, window, mode, linkName, null);
}
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled) throws Sender.SenderCreationException
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
return createSender(targetName, window, mode, linkName, false, unsettled);
}
public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled) throws Sender.SenderCreationException
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws Sender.SenderCreationException, ConnectionClosedException
{
return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
targetName, null, window, mode, isDurable, unsettled);
@@ -104,79 +111,84 @@ public class Session
}
- public Receiver createReceiver(final String sourceAddr) throws AmqpErrorException
+ public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
{
return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
}
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode) throws AmqpErrorException
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+ throws ConnectionErrorException
{
return createReceiver(queue, null, mode);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(queue, null, mode, linkName);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled) throws AmqpErrorException
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode) throws AmqpErrorException
+ final AcknowledgeMode ackMode)
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr, mode, ackMode, null);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName) throws AmqpErrorException
+ final AcknowledgeMode ackMode, String linkName)
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr,mode, ackMode, linkName, false);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled) throws AmqpErrorException
+ Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
{
return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
}
@@ -184,7 +196,7 @@ public class Session
public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws AmqpErrorException
+ throws ConnectionErrorException
{
final Target target = new Target();
@@ -207,17 +219,17 @@ public class Session
}
- public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws AmqpErrorException
+ public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
{
return createReceiver(sourceAddr, StdDistMode.COPY);
}
- public synchronized Receiver createMovingReceiver(final String sourceAddr) throws AmqpErrorException
+ public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
{
return createReceiver(sourceAddr, StdDistMode.MOVE);
}
- public Receiver createTemporaryQueueReceiver() throws AmqpErrorException
+ public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
{
Source source = new Source();
source.setDynamic(true);
@@ -228,7 +240,7 @@ public class Session
return receiver;
}
- public Sender createTemporaryQueueSender() throws Sender.SenderCreationException
+ public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
{
Target target = new Target();
target.setDynamic(true);
@@ -351,4 +363,22 @@ public class Session
{
return _connection;
}
+
+ public void awaitActive()
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ while(!getEndpoint().isEnded() && !getEndpoint().isActive())
+ {
+ try
+ {
+ getEndpoint().getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
}