/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.test.unit.message; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageEOFException; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.StreamMessage; /** * @author Apache Software Foundation */ public class StreamMessageTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class); public void testStreamMessageEOF() throws Exception { Connection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL( ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new FieldTable(); ft.setString("x-match", "any"); ft.setString("F1000", "1"); MessageConsumer consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); // This is the default now Connection con2 = (AMQConnection) getConnection("guest", "guest"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Need to start the "producer" connection in order to receive bounced messages _logger.info("Starting producer connection"); con2.start(); MessageProducer mandatoryProducer = producerSession.createProducer(queue); // Third test - should be routed _logger.info("Sending isBound message"); StreamMessage msg = producerSession.createStreamMessage(); msg.setStringProperty("F1000", "1"); msg.writeByte((byte) 42); mandatoryProducer.send(msg); _logger.info("Starting consumer connection"); con.start(); StreamMessage msg2 = (StreamMessage) consumer.receive(2000); assertNotNull(msg2); msg2.readByte(); try { msg2.readByte(); fail("Expected exception not thrown"); } catch (Exception e) { assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException); } con.close(); con2.close(); } public void testModifyReceivedMessageExpandsBuffer() throws Exception { final CountDownLatch awaitMessages = new CountDownLatch(1); final AtomicReference listenerCaughtException = new AtomicReference(); AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ")); MessageConsumer consumer = consumerSession.createConsumer(queue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { final StreamMessage sm = (StreamMessage) message; try { sm.clearBody(); // it is legal to extend a stream message's content sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd"); } catch (Throwable t) { listenerCaughtException.set(t); } finally { awaitMessages.countDown(); } } }); Connection con2 = (AMQConnection) getConnection("guest", "guest"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); con.start(); StreamMessage sm = producerSession.createStreamMessage(); sm.writeInt(42); producer.send(sm); // Allow up to five seconds for the message to arrive with the consumer final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS); assertTrue("Message did not arrive with consumer within a reasonable time", completed); final Throwable listenerException = listenerCaughtException.get(); assertNull("No exception should be caught by listener : " + listenerException, listenerException); con.close(); con2.close(); } }