summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java
blob: a179b96768f4b95df6738f9632431980c6d1430b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.message;

import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.url.AMQBindingURL;

import javax.jms.*;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;

public class MessageProtocolConversionTest extends QpidBrokerTestCase
{

    private static final int TIMEOUT = 1500;
    private Connection _connection_0_9_1;
    private Connection _connection_0_10;

    private static final boolean BOOLEAN_TEST_VAL = true;
    private static final byte BYTE_TEST_VAL = (byte) 4;
    private static final byte[] BYTES_TEST_VAL = {5, 4, 3, 2, 1};
    private static final char CHAR_TEST_VAL = 'x';
    private static final double DOUBLE_TEST_VAL = Double.MAX_VALUE;
    private static final float FLOAT_TEST_VAL = Float.MAX_VALUE;
    private static final int INT_TEST_VAL = -73;
    private static final long LONG_TEST_VAL = Long.MIN_VALUE / 2l;
    private static final short SHORT_TEST_VAL = -586;
    private static final String STRING_TEST_VAL = "This is a test text message";

    @Override
    public void setUp() throws Exception
    {
        super.setUp();
        setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
        _connection_0_10 = getConnection();
        setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1");
        _connection_0_9_1 = getConnection();
    }

    public void test0_9_1_to_0_10_conversion() throws JMSException, AMQException
    {
        doConversionTests(_connection_0_9_1, _connection_0_10);
    }

    public void test_0_10_to_0_9_1_conversion() throws JMSException, AMQException
    {

        doConversionTests(_connection_0_10, _connection_0_9_1);
    }

    private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, AMQException
    {
        Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = getTestQueue();

        MessageProducer producer = producerSession.createProducer(queue);
        MessageConsumer consumer = consumerSession.createConsumer(queue);

        consumerConn.start();
        producerConn.start();

        // Text Message

        Message m = producerSession.createTextMessage(STRING_TEST_VAL);
        producer.send(m);
        m = consumer.receive(TIMEOUT);

        assertNotNull("Expected text message did not arrive", m);
        assertTrue("Received message not an instance of TextMessage (" + m.getClass().getName() + " instead)", m instanceof TextMessage);
        assertEquals("Message text not as expected", STRING_TEST_VAL, ((TextMessage) m).getText());

        // Map Message

        MapMessage mapMessage = producerSession.createMapMessage();
        mapMessage.setBoolean("boolean", BOOLEAN_TEST_VAL);
        mapMessage.setByte("byte", BYTE_TEST_VAL);
        mapMessage.setBytes("bytes", BYTES_TEST_VAL);
        mapMessage.setChar("char", CHAR_TEST_VAL);
        mapMessage.setDouble("double", DOUBLE_TEST_VAL);
        mapMessage.setFloat("float", FLOAT_TEST_VAL);
        mapMessage.setInt("int", INT_TEST_VAL);
        mapMessage.setLong("long", LONG_TEST_VAL);
        mapMessage.setShort("short", SHORT_TEST_VAL);
        mapMessage.setString("string", STRING_TEST_VAL);

        producer.send(mapMessage);

        m = consumer.receive(TIMEOUT);

        assertNotNull("Expected map message message did not arrive", m);
        assertTrue("Received message not an instance of MapMessage (" + m.getClass().getName() + " instead)", m instanceof MapMessage);
        MapMessage receivedMapMessage = (MapMessage) m;
        assertEquals("Map message boolean value not as expected", BOOLEAN_TEST_VAL, receivedMapMessage.getBoolean("boolean"));
        assertEquals("Map message byte value not as expected", BYTE_TEST_VAL, receivedMapMessage.getByte("byte"));
        assertTrue("Map message bytes value not as expected", Arrays.equals(BYTES_TEST_VAL, receivedMapMessage.getBytes("bytes")));
        assertEquals("Map message char value not as expected", CHAR_TEST_VAL, receivedMapMessage.getChar("char"));
        assertEquals("Map message double value not as expected", DOUBLE_TEST_VAL, receivedMapMessage.getDouble("double"));
        assertEquals("Map message float value not as expected", FLOAT_TEST_VAL, receivedMapMessage.getFloat("float"));
        assertEquals("Map message int value not as expected", INT_TEST_VAL, receivedMapMessage.getInt("int"));
        assertEquals("Map message long value not as expected", LONG_TEST_VAL, receivedMapMessage.getLong("long"));
        assertEquals("Map message short value not as expected", SHORT_TEST_VAL, receivedMapMessage.getShort("short"));
        assertEquals("Map message string value not as expected", STRING_TEST_VAL, receivedMapMessage.getString("string"));
        ArrayList expectedNames = Collections.list(mapMessage.getMapNames());        
        Collections.sort(expectedNames);
        ArrayList actualNames = Collections.list(receivedMapMessage.getMapNames());
        Collections.sort(actualNames);
        assertEquals("Map message keys not as expected", expectedNames, actualNames);
        
        // Stream Message

        StreamMessage streamMessage = producerSession.createStreamMessage();
        streamMessage.writeString(STRING_TEST_VAL);
        streamMessage.writeShort(SHORT_TEST_VAL);
        streamMessage.writeLong(LONG_TEST_VAL);
        streamMessage.writeInt(INT_TEST_VAL);
        streamMessage.writeFloat(FLOAT_TEST_VAL);
        streamMessage.writeDouble(DOUBLE_TEST_VAL);
        streamMessage.writeChar(CHAR_TEST_VAL);
        streamMessage.writeBytes(BYTES_TEST_VAL);
        streamMessage.writeByte(BYTE_TEST_VAL);
        streamMessage.writeBoolean(BOOLEAN_TEST_VAL);

        producer.send(streamMessage);
        
        m = consumer.receive(TIMEOUT);
        
        assertNotNull("Expected stream message message did not arrive", m);
        assertTrue("Received message not an instance of StreamMessage (" + m.getClass().getName() + " instead)", m instanceof StreamMessage);
        StreamMessage receivedStreamMessage = (StreamMessage) m;        
        
        assertEquals("Stream message read string not as expected", STRING_TEST_VAL, receivedStreamMessage.readString());
        assertEquals("Stream message read short not as expected", SHORT_TEST_VAL, receivedStreamMessage.readShort());
        assertEquals("Stream message read long not as expected", LONG_TEST_VAL, receivedStreamMessage.readLong());
        assertEquals("Stream message read int not as expected", INT_TEST_VAL, receivedStreamMessage.readInt());
        assertEquals("Stream message read float not as expected", FLOAT_TEST_VAL, receivedStreamMessage.readFloat());
        assertEquals("Stream message read double not as expected", DOUBLE_TEST_VAL, receivedStreamMessage.readDouble());
        assertEquals("Stream message read char not as expected", CHAR_TEST_VAL, receivedStreamMessage.readChar());
        byte[] bytesVal = new byte[BYTES_TEST_VAL.length];
        receivedStreamMessage.readBytes(bytesVal);
        assertTrue("Stream message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal));
        assertEquals("Stream message read byte not as expected", BYTE_TEST_VAL, receivedStreamMessage.readByte());
        assertEquals("Stream message read boolean not as expected", BOOLEAN_TEST_VAL, receivedStreamMessage.readBoolean());

        try
        {
            receivedStreamMessage.readByte();
            fail("Unexpected remaining bytes in stream message");
        }
        catch(MessageEOFException e)
        {
            // pass
        }

        // Object Message

        ObjectMessage objectMessage = producerSession.createObjectMessage();
        objectMessage.setObject(STRING_TEST_VAL);

        producer.send(objectMessage);

        m = consumer.receive(TIMEOUT);

        assertNotNull("Expected object message message did not arrive", m);
        assertTrue("Received message not an instance of ObjectMessage (" + m.getClass().getName() + " instead)", m instanceof ObjectMessage);
        ObjectMessage receivedObjectMessage = (ObjectMessage) m;
        assertEquals("Object message value not as expected", STRING_TEST_VAL, receivedObjectMessage.getObject());


        // Bytes Message

        BytesMessage bytesMessage = producerSession.createBytesMessage();
        bytesMessage.writeBytes(BYTES_TEST_VAL);

        producer.send(bytesMessage);

        m = consumer.receive(TIMEOUT);

        assertNotNull("Expected bytes message message did not arrive", m);
        assertTrue("Received message not an instance of BytesMessage (" + m.getClass().getName() + " instead)", m instanceof BytesMessage);
        BytesMessage receivedBytesMessage = (BytesMessage) m;
        bytesVal = new byte[BYTES_TEST_VAL.length];
        receivedBytesMessage.readBytes(bytesVal);
        assertTrue("Bytes message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal));

        try
        {
            receivedBytesMessage.readByte();
            fail("Unexpected remaining bytes in stream message");
        }
        catch(MessageEOFException e)
        {
            // pass
        }

        // Headers / properties tests
        
        Message msg = producerSession.createMessage();
        msg.setJMSCorrelationID("testCorrelationId");
        msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
        msg.setJMSPriority(7);
        msg.setJMSType("testType");

        msg.setBooleanProperty("boolean", BOOLEAN_TEST_VAL);
        msg.setByteProperty("byte", BYTE_TEST_VAL);
        msg.setDoubleProperty("double", DOUBLE_TEST_VAL);
        msg.setFloatProperty("float", FLOAT_TEST_VAL);
        msg.setIntProperty("int", INT_TEST_VAL);
        msg.setLongProperty("long", LONG_TEST_VAL);
        msg.setShortProperty("short", SHORT_TEST_VAL);
        msg.setStringProperty("string", STRING_TEST_VAL);
        
        producer.send(msg);

        m = consumer.receive(TIMEOUT);
        assertNotNull("Expected message did not arrive", m);
        assertEquals("JMSMessageID differs", msg.getJMSMessageID(), m.getJMSMessageID());
        assertEquals("JMSCorrelationID differs",msg.getJMSCorrelationID(),m.getJMSCorrelationID());
        assertEquals("JMSDeliveryMode differs",msg.getJMSDeliveryMode(),m.getJMSDeliveryMode());
        assertEquals("JMSPriority differs",msg.getJMSPriority(),m.getJMSPriority());
        assertEquals("JMSType differs",msg.getJMSType(),m.getJMSType());

        assertEquals("Message boolean property not as expected", BOOLEAN_TEST_VAL, m.getBooleanProperty("boolean"));
        assertEquals("Message byte property not as expected", BYTE_TEST_VAL, m.getByteProperty("byte"));
        assertEquals("Message double property not as expected", DOUBLE_TEST_VAL, m.getDoubleProperty("double"));
        assertEquals("Message float property not as expected", FLOAT_TEST_VAL, m.getFloatProperty("float"));
        assertEquals("Message int property not as expected", INT_TEST_VAL, m.getIntProperty("int"));
        assertEquals("Message long property not as expected", LONG_TEST_VAL, m.getLongProperty("long"));
        assertEquals("Message short property not as expected", SHORT_TEST_VAL, m.getShortProperty("short"));
        assertEquals("Message string property not as expected", STRING_TEST_VAL, m.getStringProperty("string"));

        ArrayList<String> sentPropNames = Collections.list(msg.getPropertyNames());
        Collections.sort(sentPropNames);
        ArrayList<String> receivedPropNames = Collections.list(m.getPropertyNames());
        Collections.sort(receivedPropNames);

        // Shouldn't really need to do this, the client should be hiding these from us
        removeSyntheticProperties(sentPropNames);
        removeSyntheticProperties(receivedPropNames);

        assertEquals("Property names were not as expected", sentPropNames, receivedPropNames);

        // Test Reply To Queue

        Destination replyToDestination = producerSession.createTemporaryQueue();
        MessageConsumer replyToConsumer = producerSession.createConsumer(replyToDestination);
        msg = producerSession.createMessage();
        msg.setJMSReplyTo(replyToDestination);
        producer.send(msg);

        m = consumer.receive(TIMEOUT);
        assertNotNull("Expected message did not arrive", m);
        assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo());

        MessageProducer responseProducer = consumerSession.createProducer(m.getJMSReplyTo());
        responseProducer.send(consumerSession.createMessage());

        assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT));

        // Test Reply To Topic

        replyToDestination = producerSession.createTemporaryTopic();
        replyToConsumer = producerSession.createConsumer(replyToDestination);
        msg = producerSession.createMessage();
        msg.setJMSReplyTo(replyToDestination);
        producer.send(msg);

        m = consumer.receive(TIMEOUT);
        assertNotNull("Expected message did not arrive", m);
        assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo());

        responseProducer = consumerSession.createProducer(m.getJMSReplyTo());
        responseProducer.send(consumerSession.createMessage());

        assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT));


    }

    private void removeSyntheticProperties(ArrayList<String> propNames)
    {
        Iterator<String> nameIter = propNames.iterator();
        while(nameIter.hasNext())
        {
            String propName = nameIter.next();
            if(propName.startsWith("x-jms") || propName.startsWith("JMS_QPID"))
            {
                nameIter.remove();
            }
        }
    }

    @Override
    public void tearDown() throws Exception
    {
        _connection_0_9_1.close();
        _connection_0_10.close();
        super.tearDown();
    }
}