From c3d7af006982d495025164d4b1c05c1e670cb148 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Wed, 15 Dec 2010 16:07:08 +0000 Subject: QPID-2980: expose the delivery count as a property git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1049624 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageConsumer.java | 8 +++- .../qpid/client/BasicMessageProducer_0_10.java | 3 ++ .../qpid/client/BasicMessageProducer_0_8.java | 3 ++ .../org/apache/qpid/client/CustomJMSXProperty.java | 1 + .../apache/qpid/client/DeliveryCountTracker.java | 56 +++++++++++----------- .../qpid/client/message/AMQMessageDelegate.java | 9 ++++ .../client/message/AMQMessageDelegate_0_10.java | 14 ++++++ .../client/message/AMQMessageDelegate_0_8.java | 12 +++++ .../qpid/client/message/AbstractJMSMessage.java | 4 ++ .../qpid/client/DeliveryCountTrackerTest.java | 25 ++++++++++ .../client/message/AbstractJMSMessageTest.java | 49 +++++++++++++++++++ .../test/unit/client/MaxDeliveryCountTest.java | 9 ++++ 12 files changed, 164 insertions(+), 29 deletions(-) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index d8ceb3278d..0e5eb3b3d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -756,12 +756,16 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa msgId = message.getJMSMessageID(); if(msgId != null) { - _tracker.recordMessage(msgId, message.getDeliveryTag()); + int count = _tracker.recordMessage(msgId, message.getDeliveryTag()); + if(count != 0) + { + message.setJMSXDeliveryCount(count); + } } } catch (JMSException e) { - _logger.warn("Exception while retrieving JMSMessageID from message" + + _logger.error("Exception while retrieving JMSMessageID from message" + " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 9f8c783b4b..dda995d631 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -82,6 +82,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer // On the receiving side, this will be read in to the JMSXUserID as well. messageProps.setUserId(userIDBytes); + + //ensure JMSXDeliverCount is cleared + delegate.setJMSXDeliveryCount(null); if (messageId != null) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index f726cd02a2..0c8c7b800b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -125,6 +125,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); + + //ensure JMSXDeliveryCount is cleared + delegate.setJMSXDeliveryCount(null); final int size = (payload != null) ? payload.limit() : 0; final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index 7cc548915c..5948933da1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -32,6 +32,7 @@ public enum CustomJMSXProperty JMS_QPID_DESTTYPE, JMSXGroupID, JMSXGroupSeq, + JMSXDeliveryCount, JMSXUserID; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java index af01b26aec..da6b6f2be2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java @@ -90,44 +90,46 @@ public class DeliveryCountTracker * * @param msgID the JMSMessageID of the message to track * @param deliveryTag the delivery tag of the most recent encounter of the message + * @return the count of how many times the message has now been seen, or 0 if unknown. */ - public synchronized void recordMessage(String msgID, long deliveryTag) + public synchronized int recordMessage(String msgID, long deliveryTag) { - try + int count = 0; + + if(msgID == null) { - if(msgID == null) - { - //we can't distinguish between different - //messages without a JMSMessageID, so skip - return; - } + //we can't distinguish between different + //messages without a JMSMessageID, so skip + return count; + } - _jmsIDtoDeliverTag.put(msgID, deliveryTag); + _jmsIDtoDeliverTag.put(msgID, deliveryTag); - Integer count = _receivedMsgIDs.get(msgID); + //using Integer to allow null check for the count map + Integer mapCount = _receivedMsgIDs.get(msgID); - if(count != null) - { - ++count; - if (_logger.isDebugEnabled()) - { - _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count); - } - _receivedMsgIDs.put(msgID, count); - } - else + if(mapCount != null) + { + count = ++mapCount; + + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'"); - } - _receivedMsgIDs.put(msgID, 1); + _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count); } + _receivedMsgIDs.put(msgID, count); } - catch(Exception e) + else { - _logger.warn("Exception recording delivery count for message: " + msgID, e); + count = 1; + + if (_logger.isDebugEnabled()) + { + _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'"); + } + _receivedMsgIDs.put(msgID, count); } + + return count; } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java index 314508805d..04b75484cf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -134,4 +134,13 @@ public interface AMQMessageDelegate long getDeliveryTag(); void setJMSMessageID(final UUID messageId) throws JMSException; + + /** + * Set the JMSXDeliveryCount property to the given value. + * + * If the value given is null, the JMSXDeliveryCount property is removed. + * + * @param count the value to set + */ + void setJMSXDeliveryCount(final Integer count) throws JMSException; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 3660364594..637c0aa4b5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -912,4 +912,18 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate return _deliveryProps; } + public void setJMSXDeliveryCount(Integer count) throws JMSException + { + String propName = CustomJMSXProperty.JMSXDeliveryCount.toString(); + + if(count !=null) + { + setApplicationHeader(propName, count); + } + else + { + removeProperty(propName); + } + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index aed15d14c7..0c3da5d373 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -574,5 +574,17 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate return _deliveryTag; } + public void setJMSXDeliveryCount(Integer count) throws JMSException + { + String propName = CustomJMSXProperty.JMSXDeliveryCount.toString(); + if(count !=null) + { + getJmsHeaders().setInteger(propName, count); + } + else + { + removeProperty(propName); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 60c6048e43..93a6ebd850 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -534,4 +534,8 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message _delegate.removeProperty(propertyName); } + public void setJMSXDeliveryCount(Integer count) throws JMSException + { + _delegate.setJMSXDeliveryCount(count); + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java index 2f3a186d82..92f6d0ad90 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java @@ -155,4 +155,29 @@ public class DeliveryCountTrackerTest extends TestCase } } } + + /** + * Test the count value return as message ID's are tracked + */ + public void testCountReturnedWhenTracking() throws Exception + { + //check the value is 0 when provided with a null instead of a String id + assertEquals("value should have been 0", 0, _tracker.recordMessage(null, 0)); + + //check the value is 0 when provided with a null instead of a String id + //and the same delivery tag as before + assertEquals("value should have been 0", 0, _tracker.recordMessage(null, 0)); + + //check the value is 1 when provided with an id for the first time + for(int id=1 ; id <= CAPACITY; id ++) + { + assertEquals("value should have been 1", 1, _tracker.recordMessage(String.valueOf(id), id)); + } + + //check the value is 2 when provided with an id for the second time + for(int id=1 ; id <= CAPACITY; id ++) + { + assertEquals("value should have been 2", 2, _tracker.recordMessage(String.valueOf(id), id)); + } + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java index b4774113be..2cbbc3b23d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java @@ -33,4 +33,53 @@ public class AbstractJMSMessageTest extends TestCase } } + public void testSetJMSXDeliveryCount_0_8() throws JMSException + { + doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory.FACTORY_0_8); + } + + public void testSetJMSXDeliveryCount_0_10() throws JMSException + { + doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory.FACTORY_0_10); + } + + private void doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory factory) throws JMSException + { + AbstractJMSMessage abstractMessage = new JMSTextMessage(factory); + + //verify the property is not set + assertFalse("property should not yet exist", abstractMessage.propertyExists("JMSXDeliveryCount")); + + //check that retrieving the property now throws the expected NFE + try + { + abstractMessage.getIntProperty("JMSXDeliveryCount"); + fail("property should not be set, so NumberFormatException should be thrown"); + } + catch (NumberFormatException e) + { + //expected, ignore. + } + + //set the value, verify retrieval + abstractMessage.setJMSXDeliveryCount(5); + assertEquals("Value was incorrect", 5, abstractMessage.getIntProperty("JMSXDeliveryCount")); + + //remove the property + abstractMessage.setJMSXDeliveryCount(null); + + //verify property is cleared + assertFalse("property should not yet exist", abstractMessage.propertyExists("JMSXDeliveryCount")); + + //check that retrieving the property now throws the expected NFE + try + { + abstractMessage.getIntProperty("JMSXDeliveryCount"); + fail("property should not be set, so NumberFormatException should be thrown"); + } + catch (NumberFormatException e) + { + //expected, ignore. + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index 5f2935a7e0..576bb3c90c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.test.utils.QpidTestCase; @@ -402,6 +403,10 @@ public class MaxDeliveryCountTest extends QpidTestCase } _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + + //verify JMSXDeliveryCount + assertEquals("Value for JMSXDeliveryCount was not as expected", + _deliveryAttempts, message.getIntProperty("JMSXDeliveryCount")); switch(deliveryMode) { @@ -541,6 +546,10 @@ public class MaxDeliveryCountTest extends QpidTestCase _deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen + //verify JMSXDeliveryCount + assertEquals("Value for JMSXDeliveryCount was not as expected", + _deliveryAttempts, message.getIntProperty("JMSXDeliveryCount")); + switch(deliveryMode) { case Session.SESSION_TRANSACTED: -- cgit v1.2.1