diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message')
5 files changed, 485 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java new file mode 100644 index 0000000000..8caeaa55c0 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.NonQpidObjectMessage; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageFormatException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Apache Software Foundation + */ +public class JMSPropertiesTest extends QpidBrokerTestCase +{ + + private static final Logger _logger = LoggerFactory.getLogger(JMSPropertiesTest.class); + + public String _connectionString = "vm://:1"; + + public static final String JMS_CORR_ID = "QPIDID_01"; + public static final int JMS_DELIV_MODE = 1; + public static final String JMS_TYPE = "test.jms.type"; + protected static final String NULL_OBJECT_PROPERTY = "NullObject"; + protected static final String INVALID_OBJECT_PROPERTY = "InvalidObject"; + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testJMSProperties() throws Exception + { + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = + new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, + true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + Destination JMS_REPLY_TO = new AMQQueue(con2, "my.replyto"); + // create a test message to send + ObjectMessage sentMsg = new NonQpidObjectMessage(producerSession); + sentMsg.setJMSCorrelationID(JMS_CORR_ID); + sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); + sentMsg.setJMSType(JMS_TYPE); + sentMsg.setJMSReplyTo(JMS_REPLY_TO); + + String JMSXGroupID_VALUE = "group"; + sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE); + + int JMSXGroupSeq_VALUE = 1; + sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE); + + try + { + sentMsg.setObjectProperty(NULL_OBJECT_PROPERTY, null); + fail("Null Object Property value set"); + } + catch (MessageFormatException mfe) + { + // Check the error message + assertEquals("Incorrect error message", AMQPInvalidClassException.INVALID_OBJECT_MSG + "null", mfe.getMessage()); + } + + try + { + sentMsg.setObjectProperty(INVALID_OBJECT_PROPERTY, new Exception()); + fail("Non primitive Object Property value set"); + } + catch (MessageFormatException mfe) + { + // Check the error message + assertEquals("Incorrect error message: " + mfe.getMessage(), AMQPInvalidClassException.INVALID_OBJECT_MSG + Exception.class, mfe.getMessage()); + } + + // send it + producer.send(sentMsg); + + con2.close(); + + con.start(); + + // get message and check JMS properties + ObjectMessage rm = (ObjectMessage) consumer.receive(2000); + assertNotNull(rm); + + assertEquals("JMS Correlation ID mismatch", sentMsg.getJMSCorrelationID(), rm.getJMSCorrelationID()); + // TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion + // assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); + assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); + assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); + assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority()); + + //Validate that the JMSX values are correct + assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); + assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); + + boolean JMSXGroupID_Available = false; + boolean JMSXGroupSeq_Available = false; + Enumeration props = con.getMetaData().getJMSXPropertyNames(); + while (props.hasMoreElements()) + { + String name = (String) props.nextElement(); + if (name.equals("JMSXGroupID")) + { + JMSXGroupID_Available = true; + } + if (name.equals("JMSXGroupSeq")) + { + JMSXGroupSeq_Available = true; + } + } + + assertTrue("JMSXGroupID not available.",JMSXGroupID_Available); + assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available); + + // Check that the NULL_OBJECT_PROPERTY was not set or transmitted. + assertFalse(NULL_OBJECT_PROPERTY + " was not set.", rm.propertyExists(NULL_OBJECT_PROPERTY)); + + con.close(); + } + + /** + * Test Goal : test if the message properties can be retrieved properly with out an error + * and also test if unsupported properties are filtered out. See QPID-2930. + */ + public void testGetPropertyNames() throws Exception + { + Connection con = getConnection("guest", "guest"); + Session ssn = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + con.start(); + + Topic topic = ssn.createTopic("test"); + MessageConsumer consumer = ssn.createConsumer(topic); + MessageProducer prod = ssn.createProducer(topic); + Message m = ssn.createMessage(); + m.setObjectProperty("x-amqp-0-10.routing-key", "routing-key".getBytes()); + m.setObjectProperty("routing-key", "routing-key"); + prod.send(m); + + Message msg = consumer.receive(1000); + assertNotNull(msg); + + Enumeration<String> enu = msg.getPropertyNames(); + Map<String,String> map = new HashMap<String,String>(); + while (enu.hasMoreElements()) + { + String name = enu.nextElement(); + String value = msg.getStringProperty(name); + map.put(name, value); + } + + assertFalse("Property 'x-amqp-0-10.routing-key' should have been filtered out",map.containsKey("x-amqp-0-10.routing-key")); + assertTrue("Property routing-key should be present",map.containsKey("routing-key")); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java new file mode 100644 index 0000000000..0f799073b4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQHeadersExchange; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageEOFException; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.StreamMessage; + +/** + * @author Apache Software Foundation + */ +public class StreamMessageTest extends QpidBrokerTestCase +{ + + private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class); + + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testStreamMessageEOF() throws Exception + { + Connection con = (AMQConnection) getConnection("guest", "guest"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + AMQHeadersExchange queue = + new AMQHeadersExchange(new AMQBindingURL( + ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); + ft.setString("x-match", "any"); + ft.setString("F1000", "1"); + MessageConsumer consumer = + consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); + + // force synch to ensure the consumer has resulted in a bound queue + // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now + + Connection con2 = (AMQConnection) getConnection("guest", "guest"); + + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + MessageProducer mandatoryProducer = producerSession.createProducer(queue); + + // Third test - should be routed + _logger.info("Sending isBound message"); + StreamMessage msg = producerSession.createStreamMessage(); + + msg.setStringProperty("F1000", "1"); + + msg.writeByte((byte) 42); + + mandatoryProducer.send(msg); + + _logger.info("Starting consumer connection"); + con.start(); + + StreamMessage msg2 = (StreamMessage) consumer.receive(2000); + assertNotNull(msg2); + + msg2.readByte(); + try + { + msg2.readByte(); + } + catch (Exception e) + { + assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException); + } + con.close(); + con2.close(); + } + + public void testModifyReceivedMessageExpandsBuffer() throws Exception + { + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ")); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + + public void onMessage(Message message) + { + StreamMessage sm = (StreamMessage) message; + try + { + sm.clearBody(); + sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd"); + } + catch (JMSException e) + { + _logger.error("Error when writing large string to received msg: " + e, e); + fail("Error when writing large string to received msg" + e); + } + } + }); + + Connection con2 = (AMQConnection) getConnection("guest", "guest"); + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer mandatoryProducer = producerSession.createProducer(queue); + con.start(); + StreamMessage sm = producerSession.createStreamMessage(); + sm.writeInt(42); + mandatoryProducer.send(sm); + Thread.sleep(2000); + con.close(); + con2.close(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8En b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8En new file mode 100644 index 0000000000..c9734b1988 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8En @@ -0,0 +1,4 @@ +exhangeName +queueName +routingkey +data
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Jp b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Jp new file mode 100644 index 0000000000..ae10752dab --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Jp @@ -0,0 +1,4 @@ +設定がそのように構成されていなければな +的某些更新没有出现在这个 README 中。你可以访问下面的 +的发行版本包括多张光盘,其中包括安装光盘和源码光盘 +目のインストール CD は、ほとんどの最近のシス
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java new file mode 100644 index 0000000000..fe929b4965 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.naming.InitialContext; +import javax.jms.*; +import java.util.Properties; +import java.io.*; + + +/** + * This test makes sure that utf8 characters can be used for + * specifying exchange, queue name and routing key. + * + * those tests are related to qpid-1384 + */ +public class UTF8Test extends QpidBrokerTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class); + + public void testPlainEn() throws Exception + { + invoke("UTF8En"); + } + + + public void testUTF8Jp() throws Exception + { + invoke("UTF8Jp"); + } + + private void invoke(String name) throws Exception + { + String path = System.getProperties().getProperty("QPID_HOME"); + path = path + "/../systests/src/main/java/org/apache/qpid/test/unit/message/" + name; + BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF8")); + runTest(in.readLine(), in.readLine(), in.readLine(), in.readLine()); + in.close(); + } + + private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception + { + _logger.info("Running test for exchange: " + exchangeName + + " queue Name: " + queueName + + " routing key: " + routingKey); + declareQueue(exchangeName, routingKey, queueName); + + javax.jms.Connection con = getConnection(); + javax.jms.Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = getDestination(exchangeName, routingKey, queueName); + // Send data + MessageProducer msgProd = sess.createProducer(dest); + TextMessage message = sess.createTextMessage(data); + msgProd.send(message); + // consume data + MessageConsumer msgCons = sess.createConsumer(dest); + con.start(); + TextMessage m = (TextMessage) msgCons.receive(RECEIVE_TIMEOUT); + assertNotNull(m); + assertEquals(m.getText(), data); + } + + private void declareQueue(String exch, String routkey, String qname) throws Exception + { + Connection conn = new Connection(); + conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false); + Session sess = conn.createSession(0); + sess.exchangeDeclare(exch, "direct", null, null); + sess.queueDeclare(qname, null, null); + sess.exchangeBind(qname, exch, routkey, null); + sess.sync(); + conn.close(); + } + + private Destination getDestination(String exch, String routkey, String qname) throws Exception + { + Properties props = new Properties(); + props.setProperty("destination.directUTF8Queue", + "direct://" + exch + "//" + qname + "?autodelete='false'&durable='false'" + + "&routingkey='" + routkey + "'"); + + // Get our connection context + InitialContext ctx = new InitialContext(props); + return (Destination) ctx.lookup("directUTF8Queue"); + } +} |