summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-13 11:01:09 +0000
committerRobert Greig <rgreig@apache.org>2006-12-13 11:01:09 +0000
commit67b2d60cd33168e7c2dbcf3f4abd95a6efa9d4b6 (patch)
tree776cf18ccc83fbe70b0264bc6309b0ce56d5923d /java
parent932613bca5c7515437182723c1c915ecf7fee3e2 (diff)
downloadqpid-python-67b2d60cd33168e7c2dbcf3f4abd95a6efa9d4b6.tar.gz
QPID-175 Patch supplied by Rob Godfrey. Now allocates new autoexpanding buffer for StreamMessage when clearBody is called
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java34
2 files changed, 41 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index bec0686ce4..6935cde491 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -59,11 +59,16 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
if (_data == null)
{
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
- _data.setAutoExpand(true);
+ allocateInitialBuffer();
}
}
+ private void allocateInitialBuffer()
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.setAutoExpand(true);
+ }
+
AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
@@ -74,7 +79,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
public void clearBodyImpl() throws JMSException
{
- _data.clear();
+ allocateInitialBuffer();
}
public String toBodyString() throws JMSException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index b987d5f65e..50944730c3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -13,6 +13,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
@@ -97,8 +98,39 @@ public class StreamMessageTest extends TestCase
{
assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException);
}
+ }
+ public void testModifyReceivedMessageExpandsBuffer() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue = new AMQQueue("testQ");
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener()
+ {
-
+ public void onMessage(Message message)
+ {
+ StreamMessage sm = (StreamMessage) message;
+ try
+ {
+ sm.clearBody();
+ sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error when writing large string to received msg: " + e, e);
+ fail("Error when writing large string to received msg" + e);
+ }
+ }
+ });
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+ con.start();
+ StreamMessage sm = producerSession.createStreamMessage();
+ sm.writeInt(42);
+ mandatoryProducer.send(sm);
+ Thread.sleep(2000);
}
}