diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-12 17:51:55 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-12 17:51:55 +0000 |
commit | 19176def0ac87f7dc1e6208edb188000ed80025d (patch) | |
tree | 21caabd6f39937c9bd88fec285da04f58304b601 /java | |
parent | 853cf049a66ed4fa727bd2c7c46e0beabeb33a33 (diff) | |
download | qpid-python-19176def0ac87f7dc1e6208edb188000ed80025d.tar.gz |
QPID-174 Fix submitted by Rob Godfrey. Now performs a flip() to ensure the limit is set correctly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486255 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 132 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 6bd4fd0297..279d861cc2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -505,7 +505,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms // position beyond the start if (_data != null) { - _data.rewind(); + if (!_readableMessage) + { + _data.flip(); + } + else + { + _data.rewind(); + } } return _data; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index 61f326d52b..7393cea714 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,18 +20,17 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; -import javax.jms.ObjectMessage; import javax.jms.JMSException; import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; +import javax.jms.ObjectMessage; import java.io.*; -import java.nio.charset.Charset; import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { @@ -94,13 +93,16 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); _data.setAutoExpand(true); } + else + { + _data.rewind(); + } try { ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); out.writeObject(serializable); out.flush(); out.close(); - _data.rewind(); } catch (IOException e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index 9b1637058d..3f726ae5ab 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,28 +19,21 @@ */ package org.apache.qpid.test.unit.basic; +import junit.framework.Assert; +import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSObjectMessage; -import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.client.transport.TransportConnection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.MessageNotWriteableException; +import javax.jms.*; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import junit.framework.TestCase; -import junit.framework.Assert; - public class ObjectMessageTest extends TestCase implements MessageListener { private AMQConnection _connection; @@ -54,6 +47,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); + TransportConnection.createVMBroker(1); try { init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); @@ -67,6 +61,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } private void init(AMQConnection connection) throws Exception @@ -263,6 +258,6 @@ public class ObjectMessageTest extends TestCase implements MessageListener public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(ObjectMessageTest.class)); + return new junit.framework.TestSuite(ObjectMessageTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java new file mode 100644 index 0000000000..b987d5f65e --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -0,0 +1,104 @@ +/** + * User: Robert Greig + * Date: 12-Dec-2006 + ****************************************************************************** + * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of + * this program may be photocopied reproduced or translated to another + * program language without prior written consent of JP Morgan Chase Ltd + ******************************************************************************/ +package org.apache.qpid.test.unit.message; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQHeadersExchange; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.PropertyFieldTable; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class StreamMessageTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(StreamMessageTest.class); + + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + public void testStreamMessageEOF() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'")); + FieldTable ft = new PropertyFieldTable(); + ft.setString("F1000","1"); + MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft); + + + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + + AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Need to start the "producer" connection in order to receive bounced messages + _logger.info("Starting producer connection"); + con2.start(); + + + MessageProducer mandatoryProducer = producerSession.createProducer(queue); + + // Third test - should be routed + _logger.info("Sending routable message"); + StreamMessage msg = producerSession.createStreamMessage(); + + msg.setStringProperty("F1000","1"); + + msg.writeByte((byte)42); + + mandatoryProducer.send(msg); + + + + _logger.info("Starting consumer connection"); + con.start(); + + StreamMessage msg2 = (StreamMessage) consumer.receive(); + + byte b1 = msg2.readByte(); + try + { + byte b2 = msg2.readByte(); + } + catch (Exception e) + { + assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException); + } + + + + } +} |