diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-19 20:48:20 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-19 20:48:20 +0000 |
commit | 1383e24ec0640fc4ce125aa155faa5a0c6b19ba4 (patch) | |
tree | 504c84a66262891461c68598d45d0978e4482f08 | |
parent | 55a17b9cb6c2cb3c3ebfddc65956a3f00c89ccb7 (diff) | |
download | qpid-python-1383e24ec0640fc4ce125aa155faa5a0c6b19ba4.tar.gz |
Merge from trunk up to revision 485854
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@488806 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 542 insertions, 108 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0bb8736227..9dcbfca6bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -54,6 +54,7 @@ import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -550,8 +551,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionMetaData getMetaData() throws JMSException { checkNotClosed(); - // TODO Auto-generated method stub - return null; + return QpidConnectionMetaData.instance(); + } public ExceptionListener getExceptionListener() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f90913e5c..03c18903e4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -38,6 +38,7 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; + import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -279,7 +280,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); } - AMQConnection getAMQConnection() + public AMQConnection getAMQConnection() { return _connection; } @@ -744,6 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); return new QueueReceiverAdaptor(dest, consumer); @@ -759,6 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { + checkValidDestination(destination); AMQQueue dest = (AMQQueue) destination; BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector); @@ -767,17 +770,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } @@ -787,6 +793,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -798,6 +805,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { + checkValidDestination(destination); return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } @@ -808,6 +816,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { + checkValidDestination(destination); return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector); } @@ -820,6 +829,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { + checkValidDestination(destination); return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } @@ -1045,6 +1055,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1061,6 +1072,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic(topic.getTopicName()); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); } @@ -1075,6 +1087,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); } @@ -1086,6 +1099,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throws JMSException { checkNotClosed(); + checkValidTopic(topic); AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); return new TopicSubscriberAdaptor(dest, consumer); @@ -1094,6 +1108,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicPublisher createPublisher(Topic topic) throws JMSException { checkNotClosed(); + checkValidTopic(topic); //return (TopicPublisher) createProducer(topic); return new TopicPublisherAdapter(createProducer(topic), topic); } @@ -1101,12 +1116,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public QueueBrowser createBrowser(Queue queue) throws JMSException { checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkNotClosed(); + checkValidQueue(queue); throw new UnsupportedOperationException("Queue browsing not supported"); } @@ -1124,6 +1141,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void unsubscribe(String name) throws JMSException { + checkNotClosed(); + //send a queue.delete for the subscription String queue = _connection.getClientID() + ":" + name; AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); @@ -1325,4 +1344,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true); _connection.getProtocolHandler().writeFrame(channelFlowFrame); } + + /* + * I could have combined the last 3 methods, but this way it improves readability + */ + private void checkValidTopic(Topic topic) throws InvalidDestinationException{ + if (topic == null){ + throw new javax.jms.InvalidDestinationException("Invalid Topic"); + } + } + + private void checkValidQueue(Queue queue) throws InvalidDestinationException{ + if (queue == null){ + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } + } + + private void checkValidDestination(Destination destination) throws InvalidDestinationException{ + if (destination == null){ + throw new javax.jms.InvalidDestinationException("Invalid Queue"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ded2152bf8..4fb62b49fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -544,7 +544,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer this.checkNotClosed(); if(_session == null || _session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 8d6287eca3..fd6070a045 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.*; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; import java.io.UnsupportedEncodingException; @@ -231,6 +232,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive, @@ -241,6 +243,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -251,6 +254,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Message message, int deliveryMode, boolean immediate) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive, @@ -262,6 +266,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j long timeToLive) throws JMSException { checkPreConditions(); + checkInitialDestination(); synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory, @@ -272,6 +277,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message) throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -285,6 +291,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -298,6 +305,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -311,6 +319,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -325,6 +334,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throws JMSException { checkPreConditions(); + checkDestination(destination); synchronized (_connection.getFailoverMutex()) { validateDestination(destination); @@ -487,17 +497,30 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _encoding = encoding; } - private void checkPreConditions() throws IllegalStateException, JMSException { + private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException { checkNotClosed(); - + + if(_session == null || _session.isClosed()){ + throw new javax.jms.IllegalStateException("Invalid Session"); + } + } + + private void checkInitialDestination(){ if(_destination == null){ throw new UnsupportedOperationException("Destination is null"); } + } + + private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{ + if (_destination != null && suppliedDestination != null){ + throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination"); + } - if(_session == null || _session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + if (suppliedDestination == null){ + throw new InvalidDestinationException("Supplied Destination was invalid"); } } + public AMQSession getSession() { return _session; diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java new file mode 100644 index 0000000000..10a65c2ad8 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client; + +import java.util.Enumeration; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +public class QpidConnectionMetaData implements ConnectionMetaData { + + private static QpidConnectionMetaData _instance = new QpidConnectionMetaData(); + + private QpidConnectionMetaData(){ + } + + public static QpidConnectionMetaData instance(){ + return _instance; + } + + public int getJMSMajorVersion() throws JMSException { + return 1; + } + + public int getJMSMinorVersion() throws JMSException { + return 1; + } + + public String getJMSProviderName() throws JMSException { + return "Apache Qpid"; + } + + public String getJMSVersion() throws JMSException { + return "1.1"; + } + + public Enumeration getJMSXPropertyNames() throws JMSException { + return null; + } + + public int getProviderMajorVersion() throws JMSException { + return 0; + } + + public int getProviderMinorVersion() throws JMSException { + return 9; + } + + public String getProviderVersion() throws JMSException { + return "Incubating-M1"; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java index 21ec50c046..aeb2afa118 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java @@ -103,7 +103,7 @@ public class QueueReceiverAdaptor implements QueueReceiver { AMQSession session = msgConsumer.getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 15bf4a125f..f90cc97a80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -52,26 +52,32 @@ public class QueueSenderAdapter implements QueueSender { } public int getDeliveryMode() throws JMSException { + checkPreConditions(); return delegate.getDeliveryMode(); } public Destination getDestination() throws JMSException { + checkPreConditions(); return delegate.getDestination(); } public boolean getDisableMessageID() throws JMSException { + checkPreConditions(); return delegate.getDisableMessageID(); } public boolean getDisableMessageTimestamp() throws JMSException { + checkPreConditions(); return delegate.getDisableMessageTimestamp(); } public int getPriority() throws JMSException { + checkPreConditions(); return delegate.getPriority(); } public long getTimeToLive() throws JMSException { + checkPreConditions(); return delegate.getTimeToLive(); } @@ -128,7 +134,7 @@ public class QueueSenderAdapter implements QueueSender { AMQSession session = ((BasicMessageProducer)delegate).getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java index 0702202c2a..02da284b83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicPublisherAdapter.java @@ -132,7 +132,7 @@ public class TopicPublisherAdapter implements TopicPublisher { AMQSession session = ((BasicMessageProducer)delegate).getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index 06e353e271..014c7c3311 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -116,7 +116,7 @@ class TopicSubscriberAdaptor implements TopicSubscriber AMQSession session = msgConsumer.getSession(); if(session == null || session.isClosed()){ - throw new UnsupportedOperationException("Invalid Session"); + throw new javax.jms.IllegalStateException("Invalid Session"); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 329153534b..514287aea7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -384,11 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms } public void acknowledge() throws JMSException - { + { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. if (_session != null) { + if (_session.getAMQConnection().isClosed()){ + throw new javax.jms.IllegalStateException("Connection is already closed"); + } + // we set multiple to true here since acknowledgement implies acknowledge of all previous messages // received on the session _session.acknowledgeMessage(_deliveryTag, true); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index cc820a5623..ccb3c0bf57 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,14 +20,11 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.StreamMessage; +import javax.jms.*; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; @@ -36,18 +33,7 @@ import java.nio.charset.Charset; */ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage { - public static final String MIME_TYPE="jms/stream-message"; - - private static final String[] _typeNames = { "boolean", - "byte", - "byte array", - "short", - "char", - "int", - "long", - "float", - "double", - "utf string" }; + public static final String MIME_TYPE="jms/stream-message"; private static final byte BOOLEAN_TYPE = (byte) 1; @@ -79,7 +65,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { this(null); } - + /** * Construct a stream message with existing data. * @@ -103,25 +89,38 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private void readAndCheckType(byte type) throws MessageFormatException + private byte readAndCheckType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException { - if (_data.get() != type) - { - throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream"); - } + checkReadable(); + checkAvailable(1); + return _data.get(); } - private void writeTypeDiscriminator(byte type) + private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException { + checkWritable(); _data.put(type); } public boolean readBoolean() throws JMSException { - checkReadable(); - checkAvailable(2); - readAndCheckType(BOOLEAN_TYPE); - return readBooleanImpl(); + byte wireType = readAndCheckType(); + boolean result; + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; } private boolean readBooleanImpl() @@ -131,10 +130,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public byte readByte() throws JMSException { - checkReadable(); - checkAvailable(2); - readAndCheckType(BYTE_TYPE); - return readByteImpl(); + byte wireType = readAndCheckType(); + byte result; + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + return result; } private byte readByteImpl() @@ -144,10 +155,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public short readShort() throws JMSException { - checkReadable(); - checkAvailable(3); - readAndCheckType(SHORT_TYPE); - return readShortImpl(); + byte wireType = readAndCheckType(); + short result; + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + return result; } private short readShortImpl() @@ -163,10 +190,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - checkReadable(); - checkAvailable(3); - readAndCheckType(CHAR_TYPE); - return readCharImpl(); + byte wireType = readAndCheckType(); + if (wireType != CHAR_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } } private char readCharImpl() @@ -176,10 +209,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public int readInt() throws JMSException { - checkReadable(); - checkAvailable(5); - readAndCheckType(INT_TYPE); - return readIntImpl(); + byte wireType = readAndCheckType(); + int result; + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; } private int readIntImpl() @@ -189,10 +242,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public long readLong() throws JMSException { - checkReadable(); - checkAvailable(9); - readAndCheckType(LONG_TYPE); - return readLongImpl(); + byte wireType = readAndCheckType(); + long result; + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; } private long readLongImpl() @@ -202,10 +279,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public float readFloat() throws JMSException { - checkReadable(); - checkAvailable(5); - readAndCheckType(FLOAT_TYPE); - return readFloatImpl(); + byte wireType = readAndCheckType(); + float result; + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; } private float readFloatImpl() @@ -215,10 +304,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public double readDouble() throws JMSException { - checkReadable(); - checkAvailable(9); - readAndCheckType(DOUBLE_TYPE); - return readDoubleImpl(); + byte wireType = readAndCheckType(); + double result; + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; } private double readDoubleImpl() @@ -228,12 +333,50 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public String readString() throws JMSException { - checkReadable(); - // we check only for one byte plus the type byte since theoretically the string could be only a - // single byte when using UTF-8 encoding - checkAvailable(2); - readAndCheckType(STRING_TYPE); - return readStringImpl(); + byte wireType = readAndCheckType(); + String result; + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; } private String readStringImpl() throws JMSException @@ -260,9 +403,15 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess // first call if (_byteArrayRemaining == -1) { - // type discriminator plus array size - checkAvailable(5); - readAndCheckType(BYTEARRAY_TYPE); + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readAndCheckType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); int size = _data.getInt(); // size of -1 indicates null if (size == -1) @@ -292,7 +441,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess _byteArrayRemaining -= count; if (_byteArrayRemaining == 0) { - _byteArrayRemaining = -1; + _byteArrayRemaining = -1; } if (count == 0) { @@ -307,16 +456,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public Object readObject() throws JMSException { - checkReadable(); - checkAvailable(1); - byte type = _data.get(); + byte wireType = readAndCheckType(); Object result = null; - switch (type) + switch (wireType) { case BOOLEAN_TYPE: + checkAvailable(1); result = readBooleanImpl(); break; case BYTE_TYPE: + checkAvailable(1); result = readByteImpl(); break; case BYTEARRAY_TYPE: @@ -334,24 +483,31 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess } break; case SHORT_TYPE: + checkAvailable(2); result = readShortImpl(); break; case CHAR_TYPE: + checkAvailable(2); result = readCharImpl(); break; case INT_TYPE: + checkAvailable(4); result = readIntImpl(); break; case LONG_TYPE: + checkAvailable(8); result = readLongImpl(); break; case FLOAT_TYPE: + checkAvailable(4); result = readFloatImpl(); break; case DOUBLE_TYPE: + checkAvailable(8); result = readDoubleImpl(); break; case STRING_TYPE: + checkAvailable(1); result = readStringImpl(); break; } @@ -360,63 +516,54 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBoolean(boolean b) throws JMSException { - checkWritable(); writeTypeDiscriminator(BOOLEAN_TYPE); _data.put(b ? (byte) 1 : (byte) 0); } public void writeByte(byte b) throws JMSException { - checkWritable(); writeTypeDiscriminator(BYTE_TYPE); _data.put(b); } public void writeShort(short i) throws JMSException { - checkWritable(); writeTypeDiscriminator(SHORT_TYPE); _data.putShort(i); } public void writeChar(char c) throws JMSException { - checkWritable(); writeTypeDiscriminator(CHAR_TYPE); _data.putChar(c); } public void writeInt(int i) throws JMSException { - checkWritable(); writeTypeDiscriminator(INT_TYPE); _data.putInt(i); } public void writeLong(long l) throws JMSException { - checkWritable(); writeTypeDiscriminator(LONG_TYPE); _data.putLong(l); } public void writeFloat(float v) throws JMSException { - checkWritable(); writeTypeDiscriminator(FLOAT_TYPE); _data.putFloat(v); } public void writeDouble(double v) throws JMSException { - checkWritable(); writeTypeDiscriminator(DOUBLE_TYPE); _data.putDouble(v); } public void writeString(String string) throws JMSException { - checkWritable(); writeTypeDiscriminator(STRING_TYPE); try { @@ -434,13 +581,11 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBytes(byte[] bytes) throws JMSException { - checkWritable(); writeBytes(bytes, 0, bytes == null?0:bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { - checkWritable(); writeTypeDiscriminator(BYTEARRAY_TYPE); if (bytes == null) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 0da4147351..87b79cde74 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -96,7 +96,7 @@ public class ConnectionTest extends TestCase } } - public void testUnresolvedHostFailure() throws Exception +/* public void testUnresolvedHostFailure() throws Exception { try { @@ -111,7 +111,7 @@ public class ConnectionTest extends TestCase } } } - + */ public void testClientIdCannotBeChanged() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java index af7856a78a..ef00f0b9f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java @@ -24,10 +24,7 @@ import junit.framework.TestCase; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.TestMessageHelper; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import javax.jms.MessageFormatException; -import javax.jms.MessageEOFException; +import javax.jms.*; import java.util.HashMap; /** @@ -240,7 +237,7 @@ public class StreamMessageTest extends TestCase len = bm.readBytes(result); assertEquals(1, len); len = bm.readBytes(result); - assertEquals(2, len); + assertEquals(2, len); } public void testEOFByte() throws Exception @@ -418,7 +415,7 @@ public class StreamMessageTest extends TestCase fail("expected MessageEOFException, got " + e); } } - + public void testToBodyStringWithNull() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -427,6 +424,174 @@ public class StreamMessageTest extends TestCase assertNull(result); } + private void checkConversionsFail(StreamMessage sm, int[] conversions) throws JMSException + { + for (int conversion : conversions) + { + try + { + switch (conversion) + { + case 0: + sm.readBoolean(); + break; + case 1: + sm.readByte(); + break; + case 2: + sm.readShort(); + break; + case 3: + sm.readChar(); + break; + case 4: + sm.readInt(); + break; + case 5: + sm.readLong(); + break; + case 6: + sm.readFloat(); + break; + case 7: + sm.readDouble(); + break; + case 8: + sm.readString(); + break; + case 9: + sm.readBytes(new byte[3]); + break; + } + fail("MessageFormatException was not thrown"); + } + catch (MessageFormatException e) + { + // PASS + } + sm.reset(); + } + } + public void testBooleanConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBoolean(true); + bm.reset(); + String result = bm.readString(); + assertEquals("true", result); + bm.reset(); + checkConversionsFail(bm, new int[]{1,2,3,4,5,6,7,9}); + } + + public void testByteConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeByte((byte) 43); + bm.reset(); + assertEquals(43, bm.readShort()); + bm.reset(); + assertEquals(43, bm.readInt()); + bm.reset(); + assertEquals(43, bm.readLong()); + bm.reset(); + String result = bm.readString(); + assertEquals("43", result); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 3, 6, 7, 9}); + } + + public void testShortConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeShort((short) 87); + bm.reset(); + assertEquals(87, bm.readInt()); + bm.reset(); + assertEquals(87, bm.readLong()); + bm.reset(); + assertEquals("87", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 3, 6, 7, }); + } + + public void testCharConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeChar('d'); + bm.reset(); + assertEquals("d", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 4, 5, 6, 7, 9}); + } + + public void testIntConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(167); + bm.reset(); + assertEquals(167, bm.readLong()); + bm.reset(); + assertEquals("167", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 6, 7, 9}); + } + + public void testLongConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeLong(1678); + bm.reset(); + assertEquals("1678", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 6, 7, 9}); + } + + public void testFloatConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeFloat(6.2f); + bm.reset(); + assertEquals(6.2d, bm.readDouble(), 0.01); + bm.reset(); + assertEquals("6.2", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 9}); + } + + public void testDoubleConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeDouble(88.35d); + bm.reset(); + assertEquals("88.35", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 6, 9}); + } + + public void testStringConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("true"); + bm.reset(); + assertEquals(true, bm.readBoolean()); + bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("2"); + bm.reset(); + assertEquals((byte)2, bm.readByte()); + bm.reset(); + assertEquals((short)2, bm.readShort()); + bm.reset(); + assertEquals((int)2, bm.readInt()); + bm.reset(); + assertEquals((long)2, bm.readLong()); + bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("5.7"); + bm.reset(); + assertEquals(5.7f, bm.readFloat()); + bm.reset(); + assertEquals(5.7d, bm.readDouble()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(StreamMessageTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 98e355b0da..eee9b2de9f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -87,19 +87,19 @@ public class AMQProtocolSessionTest extends TestCase _testSession.getMinaProtocolSession().setLocalPort(_port); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an address with special chars",_generatedAddress,testAddress); + assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress); //test empty address _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an empty address",_generatedAddress_2,testAddress); + assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress); //test address with no special chars _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue name from an address with no special chars",_generatedAddress_3,testAddress); + assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress); } |