diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-12 17:40:29 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-12 17:40:29 +0000 |
commit | 0d7401d01394f31b20d23316f79f09f1a197b97f (patch) | |
tree | 8ce4f436271e869af6bc5a46bb3fa2a65b3c6149 | |
parent | e359e9b6de1f6aec94f83072323cb1936fb1594c (diff) | |
download | qpid-python-0d7401d01394f31b20d23316f79f09f1a197b97f.tar.gz |
QPID-3753 : [Java Broker] Improve automatic conversion of messages between 0-8/9/9-1 and 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1230661 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 417 insertions, 28 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 82ac01cea8..3f5c204f43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -266,7 +266,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException { - if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName())) + String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); + if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) { throw new AMQSecurityException("Permission denied: " + e.getName()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java index 483bca894e..fa06a99204 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.output; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.*; import org.apache.qpid.AMQPInvalidClassException; +import java.util.HashMap; import java.util.Map; public class HeaderPropertiesConverter { - public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage) + public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost) { BasicContentHeaderProperties props = new BasicContentHeaderProperties(); @@ -82,11 +83,23 @@ public class HeaderPropertiesConverter } if(messageProps.hasMessageId()) { - props.setMessageId(messageProps.getMessageId().toString()); + props.setMessageId("ID:" + messageProps.getMessageId().toString()); } + if(messageProps.hasReplyTo()) + { + ReplyTo replyTo = messageProps.getReplyTo(); + String exchangeName = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + if(exchangeName == null) + { + exchangeName = ""; + } - // TODO Reply-to + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); + props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); + } if(messageProps.hasUserId()) { props.setUserId(new AMQShortString(messageProps.getUserId())); @@ -94,7 +107,12 @@ public class HeaderPropertiesConverter if(messageProps.hasApplicationHeaders()) { - Map<String, Object> appHeaders = messageProps.getApplicationHeaders(); + Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); + if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) + { + props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); + } + FieldTable ft = new FieldTable(); for(Map.Entry<String, Object> entry : appHeaders.entrySet()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index efd904f6aa..1e62e5e9ca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -91,7 +91,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 010afcb1a9..78507b0cf2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -86,7 +86,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index 5e2b3e4556..9102b6c651 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -85,7 +85,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 3246dc0275..7aac7a3f7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.url.AMQBindingURL; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -85,7 +77,6 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { - private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -461,7 +452,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); } - // TODO - ReplyTo + if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + { + String origReplyToString = properties.getReplyTo().asString(); + ReplyTo replyTo = new ReplyTo(); + // if the string looks like a binding URL, then attempt to parse it... + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + } + catch (URISyntaxException e) + { + replyTo.setRoutingKey(origReplyToString); + } + messageProps.setReplyTo(replyTo); + + } + + if(properties.getMessageId() != null) + { + try + { + String messageIdAsString = properties.getMessageIdAsString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + UUID uuid = UUID.fromString(messageIdAsString); + messageProps.setMessageId(uuid); + } + catch(IllegalArgumentException e) + { + // ignore - can't parse + } + } + + if(properties.getUserId() != null) { @@ -470,7 +507,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr FieldTable fieldTable = properties.getHeaders(); - final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + + if(properties.getType() != null) + { + appHeaders.put("x-jms-type", properties.getTypeAsString()); + } messageProps.setApplicationHeaders(appHeaders); diff --git a/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java b/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java new file mode 100644 index 0000000000..a179b96768 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/message/MessageProtocolConversionTest.java @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.AMQBindingURL; + +import javax.jms.*; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; + +public class MessageProtocolConversionTest extends QpidBrokerTestCase +{ + + private static final int TIMEOUT = 1500; + private Connection _connection_0_9_1; + private Connection _connection_0_10; + + private static final boolean BOOLEAN_TEST_VAL = true; + private static final byte BYTE_TEST_VAL = (byte) 4; + private static final byte[] BYTES_TEST_VAL = {5, 4, 3, 2, 1}; + private static final char CHAR_TEST_VAL = 'x'; + private static final double DOUBLE_TEST_VAL = Double.MAX_VALUE; + private static final float FLOAT_TEST_VAL = Float.MAX_VALUE; + private static final int INT_TEST_VAL = -73; + private static final long LONG_TEST_VAL = Long.MIN_VALUE / 2l; + private static final short SHORT_TEST_VAL = -586; + private static final String STRING_TEST_VAL = "This is a test text message"; + + @Override + public void setUp() throws Exception + { + super.setUp(); + setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-10"); + _connection_0_10 = getConnection(); + setTestSystemProperty(ClientProperties.AMQP_VERSION, "0-9-1"); + _connection_0_9_1 = getConnection(); + } + + public void test0_9_1_to_0_10_conversion() throws JMSException, AMQException + { + doConversionTests(_connection_0_9_1, _connection_0_10); + } + + public void test_0_10_to_0_9_1_conversion() throws JMSException, AMQException + { + + doConversionTests(_connection_0_10, _connection_0_9_1); + } + + private void doConversionTests(Connection producerConn, Connection consumerConn) throws JMSException, AMQException + { + Session producerSession = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = getTestQueue(); + + MessageProducer producer = producerSession.createProducer(queue); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + consumerConn.start(); + producerConn.start(); + + // Text Message + + Message m = producerSession.createTextMessage(STRING_TEST_VAL); + producer.send(m); + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected text message did not arrive", m); + assertTrue("Received message not an instance of TextMessage (" + m.getClass().getName() + " instead)", m instanceof TextMessage); + assertEquals("Message text not as expected", STRING_TEST_VAL, ((TextMessage) m).getText()); + + // Map Message + + MapMessage mapMessage = producerSession.createMapMessage(); + mapMessage.setBoolean("boolean", BOOLEAN_TEST_VAL); + mapMessage.setByte("byte", BYTE_TEST_VAL); + mapMessage.setBytes("bytes", BYTES_TEST_VAL); + mapMessage.setChar("char", CHAR_TEST_VAL); + mapMessage.setDouble("double", DOUBLE_TEST_VAL); + mapMessage.setFloat("float", FLOAT_TEST_VAL); + mapMessage.setInt("int", INT_TEST_VAL); + mapMessage.setLong("long", LONG_TEST_VAL); + mapMessage.setShort("short", SHORT_TEST_VAL); + mapMessage.setString("string", STRING_TEST_VAL); + + producer.send(mapMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected map message message did not arrive", m); + assertTrue("Received message not an instance of MapMessage (" + m.getClass().getName() + " instead)", m instanceof MapMessage); + MapMessage receivedMapMessage = (MapMessage) m; + assertEquals("Map message boolean value not as expected", BOOLEAN_TEST_VAL, receivedMapMessage.getBoolean("boolean")); + assertEquals("Map message byte value not as expected", BYTE_TEST_VAL, receivedMapMessage.getByte("byte")); + assertTrue("Map message bytes value not as expected", Arrays.equals(BYTES_TEST_VAL, receivedMapMessage.getBytes("bytes"))); + assertEquals("Map message char value not as expected", CHAR_TEST_VAL, receivedMapMessage.getChar("char")); + assertEquals("Map message double value not as expected", DOUBLE_TEST_VAL, receivedMapMessage.getDouble("double")); + assertEquals("Map message float value not as expected", FLOAT_TEST_VAL, receivedMapMessage.getFloat("float")); + assertEquals("Map message int value not as expected", INT_TEST_VAL, receivedMapMessage.getInt("int")); + assertEquals("Map message long value not as expected", LONG_TEST_VAL, receivedMapMessage.getLong("long")); + assertEquals("Map message short value not as expected", SHORT_TEST_VAL, receivedMapMessage.getShort("short")); + assertEquals("Map message string value not as expected", STRING_TEST_VAL, receivedMapMessage.getString("string")); + ArrayList expectedNames = Collections.list(mapMessage.getMapNames()); + Collections.sort(expectedNames); + ArrayList actualNames = Collections.list(receivedMapMessage.getMapNames()); + Collections.sort(actualNames); + assertEquals("Map message keys not as expected", expectedNames, actualNames); + + // Stream Message + + StreamMessage streamMessage = producerSession.createStreamMessage(); + streamMessage.writeString(STRING_TEST_VAL); + streamMessage.writeShort(SHORT_TEST_VAL); + streamMessage.writeLong(LONG_TEST_VAL); + streamMessage.writeInt(INT_TEST_VAL); + streamMessage.writeFloat(FLOAT_TEST_VAL); + streamMessage.writeDouble(DOUBLE_TEST_VAL); + streamMessage.writeChar(CHAR_TEST_VAL); + streamMessage.writeBytes(BYTES_TEST_VAL); + streamMessage.writeByte(BYTE_TEST_VAL); + streamMessage.writeBoolean(BOOLEAN_TEST_VAL); + + producer.send(streamMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected stream message message did not arrive", m); + assertTrue("Received message not an instance of StreamMessage (" + m.getClass().getName() + " instead)", m instanceof StreamMessage); + StreamMessage receivedStreamMessage = (StreamMessage) m; + + assertEquals("Stream message read string not as expected", STRING_TEST_VAL, receivedStreamMessage.readString()); + assertEquals("Stream message read short not as expected", SHORT_TEST_VAL, receivedStreamMessage.readShort()); + assertEquals("Stream message read long not as expected", LONG_TEST_VAL, receivedStreamMessage.readLong()); + assertEquals("Stream message read int not as expected", INT_TEST_VAL, receivedStreamMessage.readInt()); + assertEquals("Stream message read float not as expected", FLOAT_TEST_VAL, receivedStreamMessage.readFloat()); + assertEquals("Stream message read double not as expected", DOUBLE_TEST_VAL, receivedStreamMessage.readDouble()); + assertEquals("Stream message read char not as expected", CHAR_TEST_VAL, receivedStreamMessage.readChar()); + byte[] bytesVal = new byte[BYTES_TEST_VAL.length]; + receivedStreamMessage.readBytes(bytesVal); + assertTrue("Stream message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); + assertEquals("Stream message read byte not as expected", BYTE_TEST_VAL, receivedStreamMessage.readByte()); + assertEquals("Stream message read boolean not as expected", BOOLEAN_TEST_VAL, receivedStreamMessage.readBoolean()); + + try + { + receivedStreamMessage.readByte(); + fail("Unexpected remaining bytes in stream message"); + } + catch(MessageEOFException e) + { + // pass + } + + // Object Message + + ObjectMessage objectMessage = producerSession.createObjectMessage(); + objectMessage.setObject(STRING_TEST_VAL); + + producer.send(objectMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected object message message did not arrive", m); + assertTrue("Received message not an instance of ObjectMessage (" + m.getClass().getName() + " instead)", m instanceof ObjectMessage); + ObjectMessage receivedObjectMessage = (ObjectMessage) m; + assertEquals("Object message value not as expected", STRING_TEST_VAL, receivedObjectMessage.getObject()); + + + // Bytes Message + + BytesMessage bytesMessage = producerSession.createBytesMessage(); + bytesMessage.writeBytes(BYTES_TEST_VAL); + + producer.send(bytesMessage); + + m = consumer.receive(TIMEOUT); + + assertNotNull("Expected bytes message message did not arrive", m); + assertTrue("Received message not an instance of BytesMessage (" + m.getClass().getName() + " instead)", m instanceof BytesMessage); + BytesMessage receivedBytesMessage = (BytesMessage) m; + bytesVal = new byte[BYTES_TEST_VAL.length]; + receivedBytesMessage.readBytes(bytesVal); + assertTrue("Bytes message read bytes not as expected", Arrays.equals(BYTES_TEST_VAL, bytesVal)); + + try + { + receivedBytesMessage.readByte(); + fail("Unexpected remaining bytes in stream message"); + } + catch(MessageEOFException e) + { + // pass + } + + // Headers / properties tests + + Message msg = producerSession.createMessage(); + msg.setJMSCorrelationID("testCorrelationId"); + msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + msg.setJMSPriority(7); + msg.setJMSType("testType"); + + msg.setBooleanProperty("boolean", BOOLEAN_TEST_VAL); + msg.setByteProperty("byte", BYTE_TEST_VAL); + msg.setDoubleProperty("double", DOUBLE_TEST_VAL); + msg.setFloatProperty("float", FLOAT_TEST_VAL); + msg.setIntProperty("int", INT_TEST_VAL); + msg.setLongProperty("long", LONG_TEST_VAL); + msg.setShortProperty("short", SHORT_TEST_VAL); + msg.setStringProperty("string", STRING_TEST_VAL); + + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertEquals("JMSMessageID differs", msg.getJMSMessageID(), m.getJMSMessageID()); + assertEquals("JMSCorrelationID differs",msg.getJMSCorrelationID(),m.getJMSCorrelationID()); + assertEquals("JMSDeliveryMode differs",msg.getJMSDeliveryMode(),m.getJMSDeliveryMode()); + assertEquals("JMSPriority differs",msg.getJMSPriority(),m.getJMSPriority()); + assertEquals("JMSType differs",msg.getJMSType(),m.getJMSType()); + + assertEquals("Message boolean property not as expected", BOOLEAN_TEST_VAL, m.getBooleanProperty("boolean")); + assertEquals("Message byte property not as expected", BYTE_TEST_VAL, m.getByteProperty("byte")); + assertEquals("Message double property not as expected", DOUBLE_TEST_VAL, m.getDoubleProperty("double")); + assertEquals("Message float property not as expected", FLOAT_TEST_VAL, m.getFloatProperty("float")); + assertEquals("Message int property not as expected", INT_TEST_VAL, m.getIntProperty("int")); + assertEquals("Message long property not as expected", LONG_TEST_VAL, m.getLongProperty("long")); + assertEquals("Message short property not as expected", SHORT_TEST_VAL, m.getShortProperty("short")); + assertEquals("Message string property not as expected", STRING_TEST_VAL, m.getStringProperty("string")); + + ArrayList<String> sentPropNames = Collections.list(msg.getPropertyNames()); + Collections.sort(sentPropNames); + ArrayList<String> receivedPropNames = Collections.list(m.getPropertyNames()); + Collections.sort(receivedPropNames); + + // Shouldn't really need to do this, the client should be hiding these from us + removeSyntheticProperties(sentPropNames); + removeSyntheticProperties(receivedPropNames); + + assertEquals("Property names were not as expected", sentPropNames, receivedPropNames); + + // Test Reply To Queue + + Destination replyToDestination = producerSession.createTemporaryQueue(); + MessageConsumer replyToConsumer = producerSession.createConsumer(replyToDestination); + msg = producerSession.createMessage(); + msg.setJMSReplyTo(replyToDestination); + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); + + MessageProducer responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); + responseProducer.send(consumerSession.createMessage()); + + assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); + + // Test Reply To Topic + + replyToDestination = producerSession.createTemporaryTopic(); + replyToConsumer = producerSession.createConsumer(replyToDestination); + msg = producerSession.createMessage(); + msg.setJMSReplyTo(replyToDestination); + producer.send(msg); + + m = consumer.receive(TIMEOUT); + assertNotNull("Expected message did not arrive", m); + assertNotNull("Message does not have ReplyTo set", m.getJMSReplyTo()); + + responseProducer = consumerSession.createProducer(m.getJMSReplyTo()); + responseProducer.send(consumerSession.createMessage()); + + assertNotNull("Expected response message did not arrive", replyToConsumer.receive(TIMEOUT)); + + + } + + private void removeSyntheticProperties(ArrayList<String> propNames) + { + Iterator<String> nameIter = propNames.iterator(); + while(nameIter.hasNext()) + { + String propName = nameIter.next(); + if(propName.startsWith("x-jms") || propName.startsWith("JMS_QPID")) + { + nameIter.remove(); + } + } + } + + @Override + public void tearDown() throws Exception + { + _connection_0_9_1.close(); + _connection_0_10.close(); + super.tearDown(); + } +} diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index 9e9bf6779a..015aa19a8e 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -174,3 +174,5 @@ org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignm org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty +// CPP Broker does not implement message conversion from 0-9-1 +org.apache.qpid.server.message.MessageProtocolConversionTest#* diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index 057d7c2c44..ba96232fe3 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -21,8 +21,10 @@ //Exclude the following from brokers using the 0-8/0-9/0-9-1 protocols //====================================================================== -// This test requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently +// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend +org.apache.qpid.server.message.MessageProtocolConversionTest#* + // QPID-2478 test fails when run against broker using 0-8/9 org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange |