summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-12 17:51:55 +0000
committerRobert Greig <rgreig@apache.org>2006-12-12 17:51:55 +0000
commit19176def0ac87f7dc1e6208edb188000ed80025d (patch)
tree21caabd6f39937c9bd88fec285da04f58304b601 /java
parent853cf049a66ed4fa727bd2c7c46e0beabeb33a33 (diff)
downloadqpid-python-19176def0ac87f7dc1e6208edb188000ed80025d.tar.gz
QPID-174 Fix submitted by Rob Godfrey. Now performs a flip() to ensure the limit is set correctly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486255 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java20
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java23
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java104
4 files changed, 132 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 6bd4fd0297..279d861cc2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -505,7 +505,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
// position beyond the start
if (_data != null)
{
- _data.rewind();
+ if (!_readableMessage)
+ {
+ _data.flip();
+ }
+ else
+ {
+ _data.rewind();
+ }
}
return _data;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index 61f326d52b..7393cea714 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
-import javax.jms.ObjectMessage;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.ObjectMessage;
import java.io.*;
-import java.nio.charset.Charset;
import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
@@ -94,13 +93,16 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
_data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
_data.setAutoExpand(true);
}
+ else
+ {
+ _data.rewind();
+ }
try
{
ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
out.writeObject(serializable);
out.flush();
out.close();
- _data.rewind();
}
catch (IOException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index 9b1637058d..3f726ae5ab 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,28 +19,21 @@
*/
package org.apache.qpid.test.unit.basic;
+import junit.framework.Assert;
+import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import junit.framework.TestCase;
-import junit.framework.Assert;
-
public class ObjectMessageTest extends TestCase implements MessageListener
{
private AMQConnection _connection;
@@ -54,6 +47,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
try
{
init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
@@ -67,6 +61,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killAllVMBrokers();
}
private void init(AMQConnection connection) throws Exception
@@ -263,6 +258,6 @@ public class ObjectMessageTest extends TestCase implements MessageListener
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ObjectMessageTest.class));
+ return new junit.framework.TestSuite(ObjectMessageTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
new file mode 100644
index 0000000000..b987d5f65e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -0,0 +1,104 @@
+/**
+ * User: Robert Greig
+ * Date: 12-Dec-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ ******************************************************************************/
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.PropertyFieldTable;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class StreamMessageTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(StreamMessageTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testStreamMessageEOF() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+
+ AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+ FieldTable ft = new PropertyFieldTable();
+ ft.setString("F1000","1");
+ MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+ // Third test - should be routed
+ _logger.info("Sending routable message");
+ StreamMessage msg = producerSession.createStreamMessage();
+
+ msg.setStringProperty("F1000","1");
+
+ msg.writeByte((byte)42);
+
+ mandatoryProducer.send(msg);
+
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+
+ StreamMessage msg2 = (StreamMessage) consumer.receive();
+
+ byte b1 = msg2.readByte();
+ try
+ {
+ byte b2 = msg2.readByte();
+ }
+ catch (Exception e)
+ {
+ assertTrue("Expected MessageEOFException: " + e, e instanceof MessageEOFException);
+ }
+
+
+
+ }
+}