diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-12-08 11:38:46 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-12-08 11:38:46 +0000 |
commit | e864e0a9158d69ffcebbcfae0bb1c900bd3478c3 (patch) | |
tree | ccf49893b43fe94647a7dc8bdc22fe7879d55dc4 | |
parent | 57d9d61d5fe2cff10b130a03ed08b802b80d43ab (diff) | |
download | qpid-python-e864e0a9158d69ffcebbcfae0bb1c900bd3478c3.tar.gz |
QPID-2242 : Update to 0-8 producer to set the JMS_QPID_DESTTYPE when sending messages.
This update required a change to the FieldTable to clear any _encodedBuffer when setting a new value
Provided test in JMSDestintationTest. This test requires a broker that supports both 0-8/9 and 0-10 as we must first use a 0-10 connection to send a message, which does not set JMS_QPID_DESTTYPE, then receive the message on 0-8/9. This allows us to validate that messages recieved without JMS_QPID_DESTTYPE set will can correctly set the value without a BufferOverflow Exception when simply forwarding the message.
Excluded test from all cpp and InVM runs as test requires a 0-8/9 and 0-10 capable broker. Can reinstate to InVM when multiprotocol testing is capable.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@888345 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 133 insertions, 12 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 048065eac9..f726cd02a2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -24,6 +24,8 @@ import java.util.UUID; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Topic; +import javax.jms.Queue; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -87,7 +89,25 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); contentHeaderProperties.setUserId(_userID); - + + //Set the JMS_QPID_DESTTYPE for 0-8/9 messages + int type; + if (destination instanceof Topic) + { + type = AMQDestination.TOPIC_TYPE; + } + else if (destination instanceof Queue) + { + type = AMQDestination.QUEUE_TYPE; + } + else + { + type = AMQDestination.UNKNOWN_TYPE; + } + + //Set JMS_QPID_DESTTYPE + delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); + if (!_disableTimestamps) { final long currentTime = System.currentTimeMillis(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 9b2f9b3969..341238c667 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -131,9 +131,10 @@ public class FieldTable } else if ((_encodedForm != null) && (val != null)) { - EncodingUtils.writeShortStringBytes(_encodedForm, key); - val.writeToBuffer(_encodedForm); - + // We have updated data to store in the buffer + // So clear the _encodedForm to allow it to be rebuilt later + // this is safer than simply appending to any existing buffer. + _encodedForm = null; } else if (val == null) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java index 036f53827b..60db1a25d7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.test.client.message; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidTestCase; @@ -30,11 +33,13 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; +import java.nio.BufferOverflowException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -45,7 +50,7 @@ import java.util.concurrent.TimeUnit; * When a message is received, its JMSDestination value must be equivalent to * the value assigned when it was sent. */ -public class JMSDestinationTest extends QpidTestCase implements MessageListener +public class JMSDestinationTest extends QpidTestCase { private Connection _connection; @@ -59,7 +64,7 @@ public class JMSDestinationTest extends QpidTestCase implements MessageListener { //Ensure JMX management is enabled for MovedToQueue test setConfigurationProperty("management.enabled", "true"); - + super.setUp(); _connection = getConnection(); @@ -129,7 +134,7 @@ public class JMSDestinationTest extends QpidTestCase implements MessageListener * current consumer destination. * * This test can only be run against the Java broker as it uses JMX to move - * messages between queues. + * messages between queues. * * @throws Exception */ @@ -158,7 +163,7 @@ public class JMSDestinationTest extends QpidTestCase implements MessageListener ManagedQueue managedQueue = jmxUtils. getManagedObject(ManagedQueue.class, jmxUtils.getQueueObjectName(getConnectionFactory().getVirtualPath().substring(1), - getTestQueueName())); + getTestQueueName())); // Find the first message on the queue TabularData data = managedQueue.viewMessages(1L, 2L); @@ -220,7 +225,14 @@ public class JMSDestinationTest extends QpidTestCase implements MessageListener _message = null; _receiveMessage = new CountDownLatch(1); - consumer.setMessageListener(this); + consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _message = message; + _receiveMessage.countDown(); + } + }); assertTrue("Timed out waiting for message to be received ", _receiveMessage.await(1, TimeUnit.SECONDS)); @@ -233,9 +245,86 @@ public class JMSDestinationTest extends QpidTestCase implements MessageListener assertEquals("Incorrect Destination type", queue.getClass(), destination.getClass()); } - public void onMessage(Message message) + /** + * Test a message received without the JMS_QPID_DESTTYPE can be resent + * and correctly have the property set. + * + * To do this we need to create a 0-10 connection and send a message + * which is then received by a 0-8/9 client. + * + * @throws Exception + */ + public void testReceiveResend() throws Exception { - _message = message; - _receiveMessage.countDown(); + // Create a 0-10 Connection and send message + setSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + + Connection connection010 = getConnection(); + + Session session010 = connection010.createSession(true, Session.SESSION_TRANSACTED); + + // Create queue for testing + Queue queue = session010.createQueue(getTestQueueName()); + + // Ensure queue exists + session010.createConsumer(queue).close(); + + sendMessage(session010, queue, 1); + + // Close the 010 connection + connection010.close(); + + // Create a 0-8 Connection and receive message + setSystemProperty(ClientProperties.AMQP_VERSION, "0-8"); + + Connection connection08 = getConnection(); + + Session session08 = connection08.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session08.createConsumer(queue); + + connection08.start(); + + Message message = consumer.receive(1000); + + assertNotNull("Didn't receive 0-10 message.", message); + + // Validate that JMS_QPID_DESTTYPE is not set + try + { + message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString()); + fail("JMS_QPID_DESTTYPE should not be set, so should throw NumberFormatException"); + } + catch (NumberFormatException nfe) + { + + } + + // Resend message back to queue and validate that + // a) getJMSDestination works without the JMS_QPID_DESTTYPE + // b) we can actually send without a BufferOverFlow. + + MessageProducer producer = session08.createProducer(queue); + + try + { + producer.send(message); + } + catch (BufferOverflowException bofe) + { + // Print the stack trace so we can validate where the execption occured. + bofe.printStackTrace(); + fail("BufferOverflowException thrown during send"); + } + + message = consumer.receive(1000); + + assertNotNull("Didn't receive recent 0-8 message.", message); + + // Validate that JMS_QPID_DESTTYPE is not set + assertEquals("JMS_QPID_DESTTYPE should be set to a Queue", AMQDestination.QUEUE_TYPE, + message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString())); + } + } diff --git a/qpid/java/test-profiles/08InVMExcludes b/qpid/java/test-profiles/08InVMExcludes index 065076d3b6..915c1ff0c2 100644 --- a/qpid/java/test-profiles/08InVMExcludes +++ b/qpid/java/test-profiles/08InVMExcludes @@ -4,3 +4,6 @@ // QPID-2097 exclude until InVM JMX test runs are reliable org.apache.qpid.server.queue.ProducerFlowControlTest#testFlowControlAttributeModificationViaJMX + +// This test requires a broker capable of 0-8/9 and 0-10 +org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend diff --git a/qpid/java/test-profiles/cpp.excludes b/qpid/java/test-profiles/cpp.excludes index 4d7363eb75..64417a0edc 100644 --- a/qpid/java/test-profiles/cpp.excludes +++ b/qpid/java/test-profiles/cpp.excludes @@ -1,2 +1,10 @@ +//====================================================================== +//Exclude the following tests when running all cpp test profilies +//====================================================================== + // This test requires JMX interface to move messages org.apache.qpid.test.client.message.JMSDestinationTest#testMovedToQueue + +// This test requires a broker capable of 0-8/9 and 0-10 +org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend + |