summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-12-15 16:07:08 +0000
committerRobert Gemmell <robbie@apache.org>2010-12-15 16:07:08 +0000
commitc3d7af006982d495025164d4b1c05c1e670cb148 (patch)
treea062481b24f85b72ea1b504e73ce72add8a7e977
parent7e0a6783f0d8b732300437514ed5ab5ebe3e84f2 (diff)
downloadqpid-python-c3d7af006982d495025164d4b1c05c1e670cb148.tar.gz
QPID-2980: expose the delivery count as a property
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1049624 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java56
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java25
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java49
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java9
12 files changed, 164 insertions, 29 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index d8ceb3278d..0e5eb3b3d9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -756,12 +756,16 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
msgId = message.getJMSMessageID();
if(msgId != null)
{
- _tracker.recordMessage(msgId, message.getDeliveryTag());
+ int count = _tracker.recordMessage(msgId, message.getDeliveryTag());
+ if(count != 0)
+ {
+ message.setJMSXDeliveryCount(count);
+ }
}
}
catch (JMSException e)
{
- _logger.warn("Exception while retrieving JMSMessageID from message" +
+ _logger.error("Exception while retrieving JMSMessageID from message" +
" with deliveryTag '" + message.getDeliveryTag() + "': " + e, e);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 9f8c783b4b..dda995d631 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -82,6 +82,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
// On the receiving side, this will be read in to the JMSXUserID as well.
messageProps.setUserId(userIDBytes);
+
+ //ensure JMSXDeliverCount is cleared
+ delegate.setJMSXDeliveryCount(null);
if (messageId != null)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index f726cd02a2..0c8c7b800b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -125,6 +125,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
+
+ //ensure JMSXDeliveryCount is cleared
+ delegate.setJMSXDeliveryCount(null);
final int size = (payload != null) ? payload.limit() : 0;
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
index 7cc548915c..5948933da1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
@@ -32,6 +32,7 @@ public enum CustomJMSXProperty
JMS_QPID_DESTTYPE,
JMSXGroupID,
JMSXGroupSeq,
+ JMSXDeliveryCount,
JMSXUserID;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
index af01b26aec..da6b6f2be2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
@@ -90,44 +90,46 @@ public class DeliveryCountTracker
*
* @param msgID the JMSMessageID of the message to track
* @param deliveryTag the delivery tag of the most recent encounter of the message
+ * @return the count of how many times the message has now been seen, or 0 if unknown.
*/
- public synchronized void recordMessage(String msgID, long deliveryTag)
+ public synchronized int recordMessage(String msgID, long deliveryTag)
{
- try
+ int count = 0;
+
+ if(msgID == null)
{
- if(msgID == null)
- {
- //we can't distinguish between different
- //messages without a JMSMessageID, so skip
- return;
- }
+ //we can't distinguish between different
+ //messages without a JMSMessageID, so skip
+ return count;
+ }
- _jmsIDtoDeliverTag.put(msgID, deliveryTag);
+ _jmsIDtoDeliverTag.put(msgID, deliveryTag);
- Integer count = _receivedMsgIDs.get(msgID);
+ //using Integer to allow null check for the count map
+ Integer mapCount = _receivedMsgIDs.get(msgID);
- if(count != null)
- {
- ++count;
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count);
- }
- _receivedMsgIDs.put(msgID, count);
- }
- else
+ if(mapCount != null)
+ {
+ count = ++mapCount;
+
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'");
- }
- _receivedMsgIDs.put(msgID, 1);
+ _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count);
}
+ _receivedMsgIDs.put(msgID, count);
}
- catch(Exception e)
+ else
{
- _logger.warn("Exception recording delivery count for message: " + msgID, e);
+ count = 1;
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'");
+ }
+ _receivedMsgIDs.put(msgID, count);
}
+
+ return count;
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
index 314508805d..04b75484cf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
@@ -134,4 +134,13 @@ public interface AMQMessageDelegate
long getDeliveryTag();
void setJMSMessageID(final UUID messageId) throws JMSException;
+
+ /**
+ * Set the JMSXDeliveryCount property to the given value.
+ *
+ * If the value given is null, the JMSXDeliveryCount property is removed.
+ *
+ * @param count the value to set
+ */
+ void setJMSXDeliveryCount(final Integer count) throws JMSException;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 3660364594..637c0aa4b5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -912,4 +912,18 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
return _deliveryProps;
}
+ public void setJMSXDeliveryCount(Integer count) throws JMSException
+ {
+ String propName = CustomJMSXProperty.JMSXDeliveryCount.toString();
+
+ if(count !=null)
+ {
+ setApplicationHeader(propName, count);
+ }
+ else
+ {
+ removeProperty(propName);
+ }
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index aed15d14c7..0c3da5d373 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -574,5 +574,17 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
return _deliveryTag;
}
+ public void setJMSXDeliveryCount(Integer count) throws JMSException
+ {
+ String propName = CustomJMSXProperty.JMSXDeliveryCount.toString();
+ if(count !=null)
+ {
+ getJmsHeaders().setInteger(propName, count);
+ }
+ else
+ {
+ removeProperty(propName);
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 60c6048e43..93a6ebd850 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -534,4 +534,8 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
_delegate.removeProperty(propertyName);
}
+ public void setJMSXDeliveryCount(Integer count) throws JMSException
+ {
+ _delegate.setJMSXDeliveryCount(count);
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
index 2f3a186d82..92f6d0ad90 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
@@ -155,4 +155,29 @@ public class DeliveryCountTrackerTest extends TestCase
}
}
}
+
+ /**
+ * Test the count value return as message ID's are tracked
+ */
+ public void testCountReturnedWhenTracking() throws Exception
+ {
+ //check the value is 0 when provided with a null instead of a String id
+ assertEquals("value should have been 0", 0, _tracker.recordMessage(null, 0));
+
+ //check the value is 0 when provided with a null instead of a String id
+ //and the same delivery tag as before
+ assertEquals("value should have been 0", 0, _tracker.recordMessage(null, 0));
+
+ //check the value is 1 when provided with an id for the first time
+ for(int id=1 ; id <= CAPACITY; id ++)
+ {
+ assertEquals("value should have been 1", 1, _tracker.recordMessage(String.valueOf(id), id));
+ }
+
+ //check the value is 2 when provided with an id for the second time
+ for(int id=1 ; id <= CAPACITY; id ++)
+ {
+ assertEquals("value should have been 2", 2, _tracker.recordMessage(String.valueOf(id), id));
+ }
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
index b4774113be..2cbbc3b23d 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
@@ -33,4 +33,53 @@ public class AbstractJMSMessageTest extends TestCase
}
}
+ public void testSetJMSXDeliveryCount_0_8() throws JMSException
+ {
+ doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory.FACTORY_0_8);
+ }
+
+ public void testSetJMSXDeliveryCount_0_10() throws JMSException
+ {
+ doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory.FACTORY_0_10);
+ }
+
+ private void doTestSetJMSXDeliveryCount(AMQMessageDelegateFactory<?> factory) throws JMSException
+ {
+ AbstractJMSMessage abstractMessage = new JMSTextMessage(factory);
+
+ //verify the property is not set
+ assertFalse("property should not yet exist", abstractMessage.propertyExists("JMSXDeliveryCount"));
+
+ //check that retrieving the property now throws the expected NFE
+ try
+ {
+ abstractMessage.getIntProperty("JMSXDeliveryCount");
+ fail("property should not be set, so NumberFormatException should be thrown");
+ }
+ catch (NumberFormatException e)
+ {
+ //expected, ignore.
+ }
+
+ //set the value, verify retrieval
+ abstractMessage.setJMSXDeliveryCount(5);
+ assertEquals("Value was incorrect", 5, abstractMessage.getIntProperty("JMSXDeliveryCount"));
+
+ //remove the property
+ abstractMessage.setJMSXDeliveryCount(null);
+
+ //verify property is cleared
+ assertFalse("property should not yet exist", abstractMessage.propertyExists("JMSXDeliveryCount"));
+
+ //check that retrieving the property now throws the expected NFE
+ try
+ {
+ abstractMessage.getIntProperty("JMSXDeliveryCount");
+ fail("property should not be set, so NumberFormatException should be thrown");
+ }
+ catch (NumberFormatException e)
+ {
+ //expected, ignore.
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 5f2935a7e0..576bb3c90c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -402,6 +403,10 @@ public class MaxDeliveryCountTest extends QpidTestCase
}
_deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+
+ //verify JMSXDeliveryCount
+ assertEquals("Value for JMSXDeliveryCount was not as expected",
+ _deliveryAttempts, message.getIntProperty("JMSXDeliveryCount"));
switch(deliveryMode)
{
@@ -541,6 +546,10 @@ public class MaxDeliveryCountTest extends QpidTestCase
_deliveryAttempts++; //increment count of times the current rolled back/recovered message(s) have been seen
+ //verify JMSXDeliveryCount
+ assertEquals("Value for JMSXDeliveryCount was not as expected",
+ _deliveryAttempts, message.getIntProperty("JMSXDeliveryCount"));
+
switch(deliveryMode)
{
case Session.SESSION_TRANSACTED: