diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java | 81 |
1 files changed, 41 insertions, 40 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 0f7342f1ab..1ab0b717ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -437,16 +437,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer try { QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); - if (_messageListener == null) + if (checkPreConditions(jmsMessage)) { - _queue.offer(jmsMessage); - } - else - { - // I still think we don't need that additional thread in SessionImpl - // if the Application blocks on a message thats fine - // getSession().dispatchMessage(getMessageActorID(), jmsMessage); - notifyMessageListener(jmsMessage); + preApplicationProcessing(jmsMessage); + + if (_messageListener == null) + { + _queue.offer(jmsMessage); + } + else + { + // I still think we don't need that additional thread in SessionImpl + // if the Application blocks on a message thats fine + // getSession().dispatchMessage(getMessageActorID(), jmsMessage); + notifyMessageListener(jmsMessage); + } } } catch (Exception e) @@ -460,40 +465,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { try { - boolean messageOk = checkPreConditions(message); - - // only deliver the message if it is valid - if (messageOk) + _messageAsyncrhonouslyReceived++; + if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) + { + // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages + resetAsynchMessageReceived(); + } + + + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session?s + * acknowledgment mode. + ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try { - _messageAsyncrhonouslyReceived++; - if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) - { - // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages - resetAsynchMessageReceived(); - } - preApplicationProcessing(message); - // The JMS specs says: - /* The result of a listener throwing a RuntimeException depends on the session?s - * acknowledgment mode. - ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message - * will be immediately redelivered. The number of times a JMS provider will - * redeliver the same message before giving up is provider-dependent. - ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. - * --- Transacted Session - the next message for the listener is delivered. - * - * The number of time we try redelivering the message is 0 - **/ - try - { - - _messageListener.onMessage((Message) message); - } - catch (RuntimeException re) - { - // do nothing as this message will not be redelivered - } + _messageListener.onMessage((Message) message); } + catch (RuntimeException re) + { + // do nothing as this message will not be redelivered + } + } catch (Exception e) @@ -533,6 +533,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { System.out.println("Message not OK, releasing"); releaseMessage(message); + return false; } } |