/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.message; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; public class MessageProtocolConversionTest extends QpidBrokerTestCase { private static final int TIMEOUT = 1500; private Connection _connection_0_9_1; private Connection _connection_0_10; private static final boolean BOOLEAN_TEST_VAL = true; private static final byte BYTE_TEST_VAL = (byte) 4; private static final byte[] BYTES_TEST_VAL = {5, 4, 3, 2, 1}; private static final char CHAR_TEST_VAL = 'x'; private static final double DOUBLE_TEST_VAL = Double.MAX_VALUE; private static final float FLOAT_TEST_VAL = Float.MAX_VALUE; private static final int INT_TEST_VAL = -73; private static final long LONG_TEST_VAL = Long.MIN_VALUE / 2l; private static final short SHORT_TEST_VAL = -586; private static final String STRING_TEST_VAL = "This is a test text message"; @Override public void setUp() throws Exception { super.setUp(); setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); _connection_0_10 = getConnection(); setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1"); _connection_0_9_1 = getConnection(); } public void test0_9_1_to_0_10_conversion() throws JMSException, AMQException { doConversionTests(_connection_0_9_1, _connection_0_10); } public void test_0_10_to_0_9_1_conversion() throws JMSException, AMQException { doConversionTests(_connection_0_10, _connection_0_9_1); } private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, AMQException { Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = getTestQueue(); MessageProducer producer = producerSession.createProducer(queue); MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConn.start(); producerConn.start(); // Text Message Message m = producerSession.createTextMessage(STRING_TEST_VAL); producer.send(m); m = consumer.receive(TIMEOUT); assertNotNull("Expected text message did not arrive", m); assertTrue("Received message not an instance of TextMessage (" + m.getClass().getName() + " instead)", m instanceof TextMessage); assertEquals("Message text not as expected", STRING_TEST_VAL, ((TextMessage) m).getText()); // Map Message MapMessage mapMessage = producerSession.createMapMessage(); mapMessage.setBoolean("boolean", BOOLEAN_TEST_VAL); mapMessage.setByte("byte", BYTE_TEST_VAL); mapMessage.setBytes("bytes", BYTES_TEST_VAL); mapMessage.setChar("char", CHAR_TEST_VAL); mapMessage.setDouble("double", DOUBLE_TEST_VAL); mapMessage.setFloat("float", FLOAT_TEST_VAL); mapMessage.setInt("int", INT_TEST_VAL); mapMessage.setLong("long", LONG_TEST_VAL); mapMessage.setShort("short", SHORT_TEST_VAL); mapMessage.setString("string", STRING_TEST_VAL); producer.send(mapMessage); m = consumer.receive(TIMEOUT); assertNotNull("Expected map message message did not arrive", m); assertTrue("Received message not an instance of MapMessage (" + m.getClass().getName() + " instead)", m instanceof MapMessage); MapMessage receivedMapMessage = (MapMessage) m; assertEquals("Map message boolean value not as expected", BOOLEAN_TEST_VAL, receivedMapMessage.getBoolean("boolean")); assertEquals("Map message byte value not as expected", BYTE_TEST_VAL, receivedMapMessage.getByte("byte")); assertTrue("Map message bytes value not as expected", Arrays.equals(BYTES_TEST_VAL, receivedMapMessage.getBytes("bytes"))); assertEquals("Map message char value not as expected", CHAR_TEST_VAL, receivedMapMessage.getChar("char")); assertEquals("Map message double value not as expected", DOUBLE_TEST_VAL, receivedMapMessage.getDouble("double")); assertEquals("Map message float value not as expected", FLOAT_TEST_VAL, receivedMapMessage.getFloat("float")); assertEquals("Map message int value not as expected", INT_TEST_VAL, receivedMapMessage.getInt("int")); assertEquals("Map message long value not as expected", LONG_TEST_VAL, receivedMapMessage.getLong("long")); assertEquals("Map message short value not as expected", SHORT_TEST_VAL, receivedMapMessage.getShort("short")); assertEquals("Map message string value not as expected", STRING_TEST_VAL, receivedMapMessage.getString("string")); ArrayList expectedNames = Collections.list(mapMessage.getMapNames()); Collections.sort(expectedNames); ArrayList actualNames = Collections.list(receivedMapMessage.getMapNames()); Collections.sort(actualNames); assertEquals("Map message keys not as expected", expectedNames, actualNames); // Stream Message StreamMessage streamMessage = producerSession.createStreamMessage(); streamMessage.writeString(STRING_TEST_VAL); streamMessage.writeShort(SHORT_TEST_VAL); streamMessage.writeLong(LONG_TEST_VAL); streamMessage.writeInt(INT_TEST_VAL); streamMessage.writeFloat(FLOAT_TEST_VAL); streamMessage.writeDouble(DOUBLE_TEST_VAL); streamMessage.writeChar(CHAR_TEST_VAL); streamMessage.writeBytes(BYTES_TEST_VAL); streamMessage.writeByte(BYTE_TEST_VAL); streamMessage.writeBoolean(BOOLEAN_TEST_VAL); producer.send(streamMessage); m = consumer.receive(TIMEOUT); assertNotNull("Expected stream message message did not arrive", m); assertTrue("Received message not an instance of StreamMessage (" + m.getClass().getName() + " instead)", m instanceof StreamMessage); StreamMessage receivedStreamMessage = (StreamMessage) m; assertEquals("Stream message read string not as expected", STRING_TEST_VAL, receivedStreamMessage.readString()); assertEquals("Stream message read short not as expected", SHORT_TEST_VAL, receivedStreamMessage.readShort()); assertEquals("Stream message read long not as expected", LONG_TEST_VAL, receivedStreamMessage.readLong()); assertEquals("Stream message read int not as expected", INT_TEST_VAL, receivedStreamMessage.readInt()); assertEquals("Stream message read float not as expected", FLOAT_TEST_VAL, receivedStreamMessage.readFloat()); assertEquals("Stream message read double not as expected", DOUBLE_TEST_VAL, receivedStreamMessage.readDouble()); assertEquals("Stream message read char not as expected", CHAR_TEST_VAL, receivedStreamMessage.readChar()); byte[] bytesVal = new byte[BYTES_TEST_VAL.length]; receivedStreamMessage.readBytes(bytesVal); assertTrue("Stream message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); assertEquals("Stream message read byte not as expected", BYTE_TEST_VAL, receivedStreamMessage.readByte()); assertEquals("Stream message read boolean not as expected", BOOLEAN_TEST_VAL, receivedStreamMessage.readBoolean()); try { receivedStreamMessage.readByte(); fail("Unexpected remaining bytes in stream message"); } catch(MessageEOFException e) { // pass } // Object Message ObjectMessage objectMessage = producerSession.createObjectMessage(); objectMessage.setObject(STRING_TEST_VAL); producer.send(objectMessage); m = consumer.receive(TIMEOUT); assertNotNull("Expected object message message did not arrive", m); assertTrue("Received message not an instance of ObjectMessage (" + m.getClass().getName() + " instead)", m instanceof ObjectMessage); ObjectMessage receivedObjectMessage = (ObjectMessage) m; assertEquals("Object message value not as expected", STRING_TEST_VAL, receivedObjectMessage.getObject()); // Bytes Message BytesMessage bytesMessage = producerSession.createBytesMessage(); bytesMessage.writeBytes(BYTES_TEST_VAL); producer.send(bytesMessage); m = consumer.receive(TIMEOUT); assertNotNull("Expected bytes message message did not arrive", m); assertTrue("Received message not an instance of BytesMessage (" + m.getClass().getName() + " instead)", m instanceof BytesMessage); BytesMessage receivedBytesMessage = (BytesMessage) m; bytesVal = new byte[BYTES_TEST_VAL.length]; receivedBytesMessage.readBytes(bytesVal); assertTrue("Bytes message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); try { receivedBytesMessage.readByte(); fail("Unexpected remaining bytes in stream message"); } catch(MessageEOFException e) { // pass } // Headers / properties tests Message msg = producerSession.createMessage(); msg.setJMSCorrelationID("testCorrelationId"); msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); msg.setJMSPriority(7); msg.setJMSType("testType"); msg.setBooleanProperty("boolean", BOOLEAN_TEST_VAL); msg.setByteProperty("byte", BYTE_TEST_VAL); msg.setDoubleProperty("double", DOUBLE_TEST_VAL); msg.setFloatProperty("float", FLOAT_TEST_VAL); msg.setIntProperty("int", INT_TEST_VAL); msg.setLongProperty("long", LONG_TEST_VAL); msg.setShortProperty("short", SHORT_TEST_VAL); msg.setStringProperty("string", STRING_TEST_VAL); producer.send(msg); m = consumer.receive(TIMEOUT); assertNotNull("Expected message did not arrive", m); assertEquals("JMSMessageID differs", msg.getJMSMessageID(), m.getJMSMessageID()); assertEquals("JMSCorrelationID differs",msg.getJMSCorrelationID(),m.getJMSCorrelationID()); assertEquals("JMSDeliveryMode differs",msg.getJMSDeliveryMode(),m.getJMSDeliveryMode()); assertEquals("JMSPriority differs",msg.getJMSPriority(),m.getJMSPriority()); assertEquals("JMSType differs",msg.getJMSType(),m.getJMSType()); assertEquals("Message boolean property not as expected", BOOLEAN_TEST_VAL, m.getBooleanProperty("boolean")); assertEquals("Message byte property not as expected", BYTE_TEST_VAL, m.getByteProperty("byte")); assertEquals("Message double property not as expected", DOUBLE_TEST_VAL, m.getDoubleProperty("double")); assertEquals("Message float property not as expected", FLOAT_TEST_VAL, m.getFloatProperty("float")); assertEquals("Message int property not as expected", INT_TEST_VAL, m.getIntProperty("int")); assertEquals("Message long property not as expected", LONG_TEST_VAL, m.getLongProperty("long")); assertEquals("Message short property not as expected", SHORT_TEST_VAL, m.getShortProperty("short")); assertEquals("Message string property not as expected", STRING_TEST_VAL, m.getStringProperty("string")); ArrayList sentPropNames = Collections.list(msg.getPropertyNames()); Collections.sort(sentPropNames); ArrayList receivedPropNames = Collections.list(m.getPropertyNames()); Collections.sort(receivedPropNames); // Shouldn't really need to do this, the client should be hiding these from us removeSyntheticProperties(sentPropNames); removeSyntheticProperties(receivedPropNames); assertEquals("Property names were not as expected", sentPropNames, receivedPropNames); // Test Reply To Queue Destination replyToDestination = producerSession.createTemporaryQueue(); MessageConsumer replyToConsumer = producerSession.createConsumer(replyToDestination); msg = producerSession.createMessage(); msg.setJMSReplyTo(replyToDestination); producer.send(msg); m = consumer.receive(TIMEOUT); assertNotNull("Expected message did not arrive", m); assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); MessageProducer responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); responseProducer.send(consumerSession.createMessage()); assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); // Test Reply To Topic replyToDestination = producerSession.createTemporaryTopic(); replyToConsumer = producerSession.createConsumer(replyToDestination); msg = producerSession.createMessage(); msg.setJMSReplyTo(replyToDestination); producer.send(msg); m = consumer.receive(TIMEOUT); assertNotNull("Expected message did not arrive", m); assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); responseProducer.send(consumerSession.createMessage()); assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); } private void removeSyntheticProperties(ArrayList propNames) { Iterator nameIter = propNames.iterator(); while(nameIter.hasNext()) { String propName = nameIter.next(); if(propName.startsWith("x-jms") || propName.startsWith("JMS_QPID")) { nameIter.remove(); } } } @Override public void tearDown() throws Exception { _connection_0_9_1.close(); _connection_0_10.close(); super.tearDown(); } }