diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 57 |
1 files changed, 40 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..b632c56708 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -196,13 +196,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected static final boolean DEFAULT_IMMEDIATE = false; + protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected static final boolean DEFAULT_MANDATORY = true; + protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); /** System property to enable strict AMQP compliance. */ public static final String STRICT_AMQP = "STRICT_AMQP"; @@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { + bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); + } + + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName, final AMQDestination destination, + final boolean nowait) throws AMQException + { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); return null; } }, _connection).execute(); @@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination, + final boolean nowait) throws AMQException, FailoverException; /** * Closes the session. @@ -1007,6 +1015,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (URISyntaxException urlse) { + _logger.error("", urlse); JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1815,6 +1824,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { startDispatcherIfNecessary(); + syncDispatchQueue(); + } + + void syncDispatchQueue() + { final CountDownLatch signal = new CountDownLatch(1); _queue.add(new Dispatchable() { public void dispatch(AMQSession ssn) @@ -1828,7 +1842,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + throw new RuntimeException(e); } } @@ -1859,6 +1873,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _inRecovery = inRecovery; } + boolean isStarted() + { + return _startedAtLeastOnce.get(); + } + /** * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * @@ -2281,7 +2300,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal) throws AMQException + { + return declareQueue(amqd, protocolHandler, noLocal, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -2296,14 +2321,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler); + sendQueueDeclare(amqd, protocolHandler, nowait); return amqd.getAMQQueueName(); } }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2416,14 +2442,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, protocolHandler, nowait); - AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); // store the consumer queue name consumer.setQueuename(queueName); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2455,11 +2481,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) // thrown by getMessageSelector - { - throw new AMQException(null, e.getMessage(), e); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); } catch (FailoverException e) { @@ -2531,8 +2553,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : consumers) { - consumer.failedOver(); + consumer.failedOverPre(); registerConsumer(consumer, true); + consumer.failedOverPost(); } } |