summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java57
1 files changed, 40 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 733bee2d81..b632c56708 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -196,13 +196,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = false;
+ protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = true;
+ protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
@@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
+ bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
+ }
+
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName, final AMQDestination destination,
+ final boolean nowait) throws AMQException
+ {
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
return null;
}
}, _connection).execute();
@@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Closes the session.
@@ -1007,6 +1015,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (URISyntaxException urlse)
{
+ _logger.error("", urlse);
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1815,6 +1824,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
startDispatcherIfNecessary();
+ syncDispatchQueue();
+ }
+
+ void syncDispatchQueue()
+ {
final CountDownLatch signal = new CountDownLatch(1);
_queue.add(new Dispatchable() {
public void dispatch(AMQSession ssn)
@@ -1828,7 +1842,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // pass
+ throw new RuntimeException(e);
}
}
@@ -1859,6 +1873,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_inRecovery = inRecovery;
}
+ boolean isStarted()
+ {
+ return _startedAtLeastOnce.get();
+ }
+
/**
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
@@ -2281,7 +2300,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal) throws AMQException
+ {
+ return declareQueue(amqd, protocolHandler, noLocal, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2296,14 +2321,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler);
+ sendQueueDeclare(amqd, protocolHandler, nowait);
return amqd.getAMQQueueName();
}
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2416,14 +2442,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, protocolHandler, nowait);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
+ AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
// store the consumer queue name
consumer.setQueuename(queueName);
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2455,11 +2481,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
- }
- catch (JMSException e) // thrown by getMessageSelector
- {
- throw new AMQException(null, e.getMessage(), e);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
}
catch (FailoverException e)
{
@@ -2531,8 +2553,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : consumers)
{
- consumer.failedOver();
+ consumer.failedOverPre();
registerConsumer(consumer, true);
+ consumer.failedOverPost();
}
}