summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-19 20:48:20 +0000
committerRobert Greig <rgreig@apache.org>2006-12-19 20:48:20 +0000
commit1383e24ec0640fc4ce125aa155faa5a0c6b19ba4 (patch)
tree504c84a66262891461c68598d45d0978e4482f08
parent55a17b9cb6c2cb3c3ebfddc65956a3f00c89ccb7 (diff)
downloadqpid-python-1383e24ec0640fc4ce125aa155faa5a0c6b19ba4.tar.gz
Merge from trunk up to revision 485854
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@488806 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java42
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java309
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java177
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java10
14 files changed, 542 insertions, 108 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0bb8736227..9dcbfca6bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -54,6 +54,7 @@ import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -550,8 +551,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
- // TODO Auto-generated method stub
- return null;
+ return QpidConnectionMetaData.instance();
+
}
public ExceptionListener getExceptionListener() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f90913e5c..03c18903e4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -38,6 +38,7 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -279,7 +280,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
}
- AMQConnection getAMQConnection()
+ public AMQConnection getAMQConnection()
{
return _connection;
}
@@ -744,6 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
@@ -759,6 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
+ checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(destination, messageSelector);
@@ -767,17 +770,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
@@ -787,6 +793,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -798,6 +805,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
@@ -808,6 +816,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
selector, rawSelector);
}
@@ -820,6 +829,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
+ checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
@@ -1045,6 +1055,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1061,6 +1072,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic(topic.getTopicName());
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
}
@@ -1075,6 +1087,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
}
@@ -1086,6 +1099,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
return new TopicSubscriberAdaptor(dest, consumer);
@@ -1094,6 +1108,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
+ checkValidTopic(topic);
//return (TopicPublisher) createProducer(topic);
return new TopicPublisherAdapter(createProducer(topic), topic);
}
@@ -1101,12 +1116,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
+ checkValidQueue(queue);
throw new UnsupportedOperationException("Queue browsing not supported");
}
@@ -1124,6 +1141,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void unsubscribe(String name) throws JMSException
{
+ checkNotClosed();
+
//send a queue.delete for the subscription
String queue = _connection.getClientID() + ":" + name;
AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
@@ -1325,4 +1344,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+
+ /*
+ * I could have combined the last 3 methods, but this way it improves readability
+ */
+ private void checkValidTopic(Topic topic) throws InvalidDestinationException{
+ if (topic == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Topic");
+ }
+ }
+
+ private void checkValidQueue(Queue queue) throws InvalidDestinationException{
+ if (queue == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
+ }
+
+ private void checkValidDestination(Destination destination) throws InvalidDestinationException{
+ if (destination == null){
+ throw new javax.jms.InvalidDestinationException("Invalid Queue");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ded2152bf8..4fb62b49fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -544,7 +544,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
this.checkNotClosed();
if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 8d6287eca3..fd6070a045 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.*;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.UnsupportedEncodingException;
@@ -231,6 +232,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
@@ -241,6 +243,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -251,6 +254,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -262,6 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
long timeToLive) throws JMSException
{
checkPreConditions();
+ checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
@@ -272,6 +277,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Destination destination, Message message) throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -285,6 +291,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -298,6 +305,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -311,6 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -325,6 +334,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throws JMSException
{
checkPreConditions();
+ checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
@@ -487,17 +497,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_encoding = encoding;
}
- private void checkPreConditions() throws IllegalStateException, JMSException {
+ private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
checkNotClosed();
-
+
+ if(_session == null || _session.isClosed()){
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+ }
+
+ private void checkInitialDestination(){
if(_destination == null){
throw new UnsupportedOperationException("Destination is null");
}
+ }
+
+ private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
+ if (_destination != null && suppliedDestination != null){
+ throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ }
- if(_session == null || _session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ if (suppliedDestination == null){
+ throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
+
public AMQSession getSession() {
return _session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
new file mode 100644
index 0000000000..10a65c2ad8
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.client;
+
+import java.util.Enumeration;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class QpidConnectionMetaData implements ConnectionMetaData {
+
+ private static QpidConnectionMetaData _instance = new QpidConnectionMetaData();
+
+ private QpidConnectionMetaData(){
+ }
+
+ public static QpidConnectionMetaData instance(){
+ return _instance;
+ }
+
+ public int getJMSMajorVersion() throws JMSException {
+ return 1;
+ }
+
+ public int getJMSMinorVersion() throws JMSException {
+ return 1;
+ }
+
+ public String getJMSProviderName() throws JMSException {
+ return "Apache Qpid";
+ }
+
+ public String getJMSVersion() throws JMSException {
+ return "1.1";
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException {
+ return null;
+ }
+
+ public int getProviderMajorVersion() throws JMSException {
+ return 0;
+ }
+
+ public int getProviderMinorVersion() throws JMSException {
+ return 9;
+ }
+
+ public String getProviderVersion() throws JMSException {
+ return "Incubating-M1";
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
index 21ec50c046..aeb2afa118 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
@@ -103,7 +103,7 @@ public class QueueReceiverAdaptor implements QueueReceiver {
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index 15bf4a125f..f90cc97a80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -52,26 +52,32 @@ public class QueueSenderAdapter implements QueueSender {
}
public int getDeliveryMode() throws JMSException {
+ checkPreConditions();
return delegate.getDeliveryMode();
}
public Destination getDestination() throws JMSException {
+ checkPreConditions();
return delegate.getDestination();
}
public boolean getDisableMessageID() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageID();
}
public boolean getDisableMessageTimestamp() throws JMSException {
+ checkPreConditions();
return delegate.getDisableMessageTimestamp();
}
public int getPriority() throws JMSException {
+ checkPreConditions();
return delegate.getPriority();
}
public long getTimeToLive() throws JMSException {
+ checkPreConditions();
return delegate.getTimeToLive();
}
@@ -128,7 +134,7 @@ public class QueueSenderAdapter implements QueueSender {
AMQSession session = ((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
index 0702202c2a..02da284b83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java
@@ -132,7 +132,7 @@ public class TopicPublisherAdapter implements TopicPublisher {
AMQSession session = ((BasicMessageProducer)delegate).getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index 06e353e271..014c7c3311 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -116,7 +116,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber
AMQSession session = msgConsumer.getSession();
if(session == null || session.isClosed()){
- throw new UnsupportedOperationException("Invalid Session");
+ throw new javax.jms.IllegalStateException("Invalid Session");
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 329153534b..514287aea7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -384,11 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_session != null)
{
+ if (_session.getAMQConnection().isClosed()){
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index cc820a5623..ccb3c0bf57 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -20,14 +20,11 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.StreamMessage;
+import javax.jms.*;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -36,18 +33,7 @@ import java.nio.charset.Charset;
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- public static final String MIME_TYPE="jms/stream-message";
-
- private static final String[] _typeNames = { "boolean",
- "byte",
- "byte array",
- "short",
- "char",
- "int",
- "long",
- "float",
- "double",
- "utf string" };
+ public static final String MIME_TYPE="jms/stream-message";
private static final byte BOOLEAN_TYPE = (byte) 1;
@@ -79,7 +65,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
{
this(null);
}
-
+
/**
* Construct a stream message with existing data.
*
@@ -103,25 +89,38 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
return MIME_TYPE;
}
- private void readAndCheckType(byte type) throws MessageFormatException
+ private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
{
- if (_data.get() != type)
- {
- throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream");
- }
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
}
- private void writeTypeDiscriminator(byte type)
+ private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
{
+ checkWritable();
_data.put(type);
}
public boolean readBoolean() throws JMSException
{
- checkReadable();
- checkAvailable(2);
- readAndCheckType(BOOLEAN_TYPE);
- return readBooleanImpl();
+ byte wireType = readAndCheckType();
+ boolean result;
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
}
private boolean readBooleanImpl()
@@ -131,10 +130,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public byte readByte() throws JMSException
{
- checkReadable();
- checkAvailable(2);
- readAndCheckType(BYTE_TYPE);
- return readByteImpl();
+ byte wireType = readAndCheckType();
+ byte result;
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ return result;
}
private byte readByteImpl()
@@ -144,10 +155,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public short readShort() throws JMSException
{
- checkReadable();
- checkAvailable(3);
- readAndCheckType(SHORT_TYPE);
- return readShortImpl();
+ byte wireType = readAndCheckType();
+ short result;
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ return result;
}
private short readShortImpl()
@@ -163,10 +190,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
*/
public char readChar() throws JMSException
{
- checkReadable();
- checkAvailable(3);
- readAndCheckType(CHAR_TYPE);
- return readCharImpl();
+ byte wireType = readAndCheckType();
+ if (wireType != CHAR_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
private char readCharImpl()
@@ -176,10 +209,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public int readInt() throws JMSException
{
- checkReadable();
- checkAvailable(5);
- readAndCheckType(INT_TYPE);
- return readIntImpl();
+ byte wireType = readAndCheckType();
+ int result;
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
}
private int readIntImpl()
@@ -189,10 +242,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public long readLong() throws JMSException
{
- checkReadable();
- checkAvailable(9);
- readAndCheckType(LONG_TYPE);
- return readLongImpl();
+ byte wireType = readAndCheckType();
+ long result;
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
}
private long readLongImpl()
@@ -202,10 +279,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public float readFloat() throws JMSException
{
- checkReadable();
- checkAvailable(5);
- readAndCheckType(FLOAT_TYPE);
- return readFloatImpl();
+ byte wireType = readAndCheckType();
+ float result;
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
}
private float readFloatImpl()
@@ -215,10 +304,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public double readDouble() throws JMSException
{
- checkReadable();
- checkAvailable(9);
- readAndCheckType(DOUBLE_TYPE);
- return readDoubleImpl();
+ byte wireType = readAndCheckType();
+ double result;
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
}
private double readDoubleImpl()
@@ -228,12 +333,50 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public String readString() throws JMSException
{
- checkReadable();
- // we check only for one byte plus the type byte since theoretically the string could be only a
- // single byte when using UTF-8 encoding
- checkAvailable(2);
- readAndCheckType(STRING_TYPE);
- return readStringImpl();
+ byte wireType = readAndCheckType();
+ String result;
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
}
private String readStringImpl() throws JMSException
@@ -260,9 +403,15 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
// first call
if (_byteArrayRemaining == -1)
{
- // type discriminator plus array size
- checkAvailable(5);
- readAndCheckType(BYTEARRAY_TYPE);
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readAndCheckType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
int size = _data.getInt();
// size of -1 indicates null
if (size == -1)
@@ -292,7 +441,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
_byteArrayRemaining -= count;
if (_byteArrayRemaining == 0)
{
- _byteArrayRemaining = -1;
+ _byteArrayRemaining = -1;
}
if (count == 0)
{
@@ -307,16 +456,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public Object readObject() throws JMSException
{
- checkReadable();
- checkAvailable(1);
- byte type = _data.get();
+ byte wireType = readAndCheckType();
Object result = null;
- switch (type)
+ switch (wireType)
{
case BOOLEAN_TYPE:
+ checkAvailable(1);
result = readBooleanImpl();
break;
case BYTE_TYPE:
+ checkAvailable(1);
result = readByteImpl();
break;
case BYTEARRAY_TYPE:
@@ -334,24 +483,31 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
}
break;
case SHORT_TYPE:
+ checkAvailable(2);
result = readShortImpl();
break;
case CHAR_TYPE:
+ checkAvailable(2);
result = readCharImpl();
break;
case INT_TYPE:
+ checkAvailable(4);
result = readIntImpl();
break;
case LONG_TYPE:
+ checkAvailable(8);
result = readLongImpl();
break;
case FLOAT_TYPE:
+ checkAvailable(4);
result = readFloatImpl();
break;
case DOUBLE_TYPE:
+ checkAvailable(8);
result = readDoubleImpl();
break;
case STRING_TYPE:
+ checkAvailable(1);
result = readStringImpl();
break;
}
@@ -360,63 +516,54 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeBoolean(boolean b) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BOOLEAN_TYPE);
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BYTE_TYPE);
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(SHORT_TYPE);
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(CHAR_TYPE);
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(INT_TYPE);
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(LONG_TYPE);
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(FLOAT_TYPE);
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(DOUBLE_TYPE);
_data.putDouble(v);
}
public void writeString(String string) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(STRING_TYPE);
try
{
@@ -434,13 +581,11 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeBytes(byte[] bytes) throws JMSException
{
- checkWritable();
writeBytes(bytes, 0, bytes == null?0:bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BYTEARRAY_TYPE);
if (bytes == null)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 0da4147351..87b79cde74 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -96,7 +96,7 @@ public class ConnectionTest extends TestCase
}
}
- public void testUnresolvedHostFailure() throws Exception
+/* public void testUnresolvedHostFailure() throws Exception
{
try
{
@@ -111,7 +111,7 @@ public class ConnectionTest extends TestCase
}
}
}
-
+ */
public void testClientIdCannotBeChanged() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
index af7856a78a..ef00f0b9f2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
@@ -24,10 +24,7 @@ import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.TestMessageHelper;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageEOFException;
+import javax.jms.*;
import java.util.HashMap;
/**
@@ -240,7 +237,7 @@ public class StreamMessageTest extends TestCase
len = bm.readBytes(result);
assertEquals(1, len);
len = bm.readBytes(result);
- assertEquals(2, len);
+ assertEquals(2, len);
}
public void testEOFByte() throws Exception
@@ -418,7 +415,7 @@ public class StreamMessageTest extends TestCase
fail("expected MessageEOFException, got " + e);
}
}
-
+
public void testToBodyStringWithNull() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -427,6 +424,174 @@ public class StreamMessageTest extends TestCase
assertNull(result);
}
+ private void checkConversionsFail(StreamMessage sm, int[] conversions) throws JMSException
+ {
+ for (int conversion : conversions)
+ {
+ try
+ {
+ switch (conversion)
+ {
+ case 0:
+ sm.readBoolean();
+ break;
+ case 1:
+ sm.readByte();
+ break;
+ case 2:
+ sm.readShort();
+ break;
+ case 3:
+ sm.readChar();
+ break;
+ case 4:
+ sm.readInt();
+ break;
+ case 5:
+ sm.readLong();
+ break;
+ case 6:
+ sm.readFloat();
+ break;
+ case 7:
+ sm.readDouble();
+ break;
+ case 8:
+ sm.readString();
+ break;
+ case 9:
+ sm.readBytes(new byte[3]);
+ break;
+ }
+ fail("MessageFormatException was not thrown");
+ }
+ catch (MessageFormatException e)
+ {
+ // PASS
+ }
+ sm.reset();
+ }
+ }
+ public void testBooleanConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBoolean(true);
+ bm.reset();
+ String result = bm.readString();
+ assertEquals("true", result);
+ bm.reset();
+ checkConversionsFail(bm, new int[]{1,2,3,4,5,6,7,9});
+ }
+
+ public void testByteConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeByte((byte) 43);
+ bm.reset();
+ assertEquals(43, bm.readShort());
+ bm.reset();
+ assertEquals(43, bm.readInt());
+ bm.reset();
+ assertEquals(43, bm.readLong());
+ bm.reset();
+ String result = bm.readString();
+ assertEquals("43", result);
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 3, 6, 7, 9});
+ }
+
+ public void testShortConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeShort((short) 87);
+ bm.reset();
+ assertEquals(87, bm.readInt());
+ bm.reset();
+ assertEquals(87, bm.readLong());
+ bm.reset();
+ assertEquals("87", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 3, 6, 7, });
+ }
+
+ public void testCharConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeChar('d');
+ bm.reset();
+ assertEquals("d", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 4, 5, 6, 7, 9});
+ }
+
+ public void testIntConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(167);
+ bm.reset();
+ assertEquals(167, bm.readLong());
+ bm.reset();
+ assertEquals("167", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 6, 7, 9});
+ }
+
+ public void testLongConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeLong(1678);
+ bm.reset();
+ assertEquals("1678", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 6, 7, 9});
+ }
+
+ public void testFloatConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeFloat(6.2f);
+ bm.reset();
+ assertEquals(6.2d, bm.readDouble(), 0.01);
+ bm.reset();
+ assertEquals("6.2", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 9});
+ }
+
+ public void testDoubleConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeDouble(88.35d);
+ bm.reset();
+ assertEquals("88.35", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 6, 9});
+ }
+
+ public void testStringConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("true");
+ bm.reset();
+ assertEquals(true, bm.readBoolean());
+ bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("2");
+ bm.reset();
+ assertEquals((byte)2, bm.readByte());
+ bm.reset();
+ assertEquals((short)2, bm.readShort());
+ bm.reset();
+ assertEquals((int)2, bm.readInt());
+ bm.reset();
+ assertEquals((long)2, bm.readLong());
+ bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("5.7");
+ bm.reset();
+ assertEquals(5.7f, bm.readFloat());
+ bm.reset();
+ assertEquals(5.7d, bm.readDouble());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(StreamMessageTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 98e355b0da..eee9b2de9f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -87,19 +87,19 @@ public class AMQProtocolSessionTest extends TestCase
_testSession.getMinaProtocolSession().setLocalPort(_port);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an address with special chars",_generatedAddress,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
//test empty address
_testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an empty address",_generatedAddress_2,testAddress);
+ assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
//test address with no special chars
_testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue name from an address with no special chars",_generatedAddress_3,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
}