summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java81
1 files changed, 41 insertions, 40 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 0f7342f1ab..1ab0b717ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -437,16 +437,21 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
try
{
QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- if (_messageListener == null)
+ if (checkPreConditions(jmsMessage))
{
- _queue.offer(jmsMessage);
- }
- else
- {
- // I still think we don't need that additional thread in SessionImpl
- // if the Application blocks on a message thats fine
- // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
- notifyMessageListener(jmsMessage);
+ preApplicationProcessing(jmsMessage);
+
+ if (_messageListener == null)
+ {
+ _queue.offer(jmsMessage);
+ }
+ else
+ {
+ // I still think we don't need that additional thread in SessionImpl
+ // if the Application blocks on a message thats fine
+ // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+ notifyMessageListener(jmsMessage);
+ }
}
}
catch (Exception e)
@@ -460,40 +465,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
try
{
- boolean messageOk = checkPreConditions(message);
-
- // only deliver the message if it is valid
- if (messageOk)
+ _messageAsyncrhonouslyReceived++;
+ if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+ {
+ // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
+ resetAsynchMessageReceived();
+ }
+
+
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
{
- _messageAsyncrhonouslyReceived++;
- if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
- {
- // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
- resetAsynchMessageReceived();
- }
- preApplicationProcessing(message);
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
-
- _messageListener.onMessage((Message) message);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
+ _messageListener.onMessage((Message) message);
}
+ catch (RuntimeException re)
+ {
+ // do nothing as this message will not be redelivered
+ }
+
}
catch (Exception e)
@@ -533,6 +533,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
System.out.println("Message not OK, releasing");
releaseMessage(message);
+ return false;
}
}