summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-09-04 19:21:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-09-04 19:21:45 +0000
commit656a3125a1ea5b2a4645d11b396d12ed11959dc9 (patch)
tree4c11e46fa8b727eb533c13211dfa806dbbc272d9
parent7c23b488255727d21464c8665cb714f79bbf8a4e (diff)
downloadqpid-python-656a3125a1ea5b2a4645d11b396d12ed11959dc9.tar.gz
NO-JIRA: JMS Fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1165093 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java30
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java61
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java7
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java63
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java34
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java49
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java149
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java29
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java3
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java246
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java178
11 files changed, 755 insertions, 94 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index f539d6c9c3..318077fd75 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -24,10 +24,14 @@ import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
import java.net.MalformedURLException;
import java.net.URL;
-public class ConnectionFactoryImpl implements ConnectionFactory
+public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
{
private String _host;
private int _port;
@@ -48,12 +52,12 @@ public class ConnectionFactoryImpl implements ConnectionFactory
_clientId = clientId;
}
- public Connection createConnection() throws JMSException
+ public ConnectionImpl createConnection() throws JMSException
{
return new ConnectionImpl(_host, _port, _username, _password, _clientId);
}
- public Connection createConnection(final String username, final String password) throws JMSException
+ public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
return new ConnectionImpl(_host, _port, username, password, _clientId);
}
@@ -97,4 +101,24 @@ public class ConnectionFactoryImpl implements ConnectionFactory
return new ConnectionFactoryImpl(host, port, username, password, clientId);
}
+
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return createConnection();
+ }
+
+ public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
+ {
+ return createConnection(username, password);
+ }
+
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ return createConnection();
+ }
+
+ public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
+ {
+ return createConnection(username, password);
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 363a56c2a0..f9a983f908 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -22,16 +22,12 @@ import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
import org.apache.qpid.amqp_1_0.jms.Session;
-import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.ArrayList;
import java.util.List;
-public class ConnectionImpl implements Connection
+public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
private ConnectionMetaData _connectionMetaData;
@@ -97,7 +93,7 @@ public class ConnectionImpl implements Connection
{
if(_state == State.CLOSED)
{
- throw new JMSException("Cannot create a session on a closed connection");
+ throw new IllegalStateException("Cannot create a session on a closed connection");
}
SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -111,6 +107,7 @@ public class ConnectionImpl implements Connection
public String getClientID() throws JMSException
{
+ checkClosed();
return null; //TODO
}
@@ -123,16 +120,19 @@ public class ConnectionImpl implements Connection
public ConnectionMetaData getMetaData() throws JMSException
{
+ checkClosed();
return _connectionMetaData;
}
public ExceptionListener getExceptionListener() throws JMSException
{
+ checkClosed();
return _exceptionListener;
}
public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
{
+ checkClosed();
_exceptionListener = exceptionListener;
}
@@ -140,7 +140,7 @@ public class ConnectionImpl implements Connection
{
synchronized(_lock)
{
-
+ checkClosed();
if(_state == State.STOPPED)
{
// TODO
@@ -153,6 +153,7 @@ public class ConnectionImpl implements Connection
}
}
+
_lock.notifyAll();
}
@@ -172,7 +173,7 @@ public class ConnectionImpl implements Connection
_state = State.STOPPED;
break;
case CLOSED:
- //TODO
+ throw new javax.jms.IllegalStateException("Closed");
}
_lock.notifyAll();
@@ -198,11 +199,33 @@ public class ConnectionImpl implements Connection
}
}
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_state == State.CLOSED)
+ throw new IllegalStateException("Closed");
+ }
+
public ConnectionConsumer createConnectionConsumer(final Destination destination,
final String s,
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
+ checkClosed();
+ return null; //TODO
+ }
+
+ public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ return createSession(transacted, acknowledgeMode);
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Topic topic,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
return null; //TODO
}
@@ -212,9 +235,27 @@ public class ConnectionImpl implements Connection
final ServerSessionPool serverSessionPool,
final int i) throws JMSException
{
+ checkClosed();
return null; //TODO
}
+ public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+ {
+ checkClosed();
+ return createSession(transacted, acknowledgeMode);
+ }
+
+ public ConnectionConsumer createConnectionConsumer(final Queue queue,
+ final String s,
+ final ServerSessionPool serverSessionPool,
+ final int i) throws JMSException
+ {
+ checkClosed();
+ return null; //TODO
+ }
+
+
+
protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
{
return _conn;
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
index de8e138868..8159c7116b 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
@@ -21,6 +21,9 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Enumeration;
public class ConnectionMetaDataImpl implements ConnectionMetaData
@@ -35,6 +38,7 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
private final int _amqpMajorVersion;
private final int _amqpMinorVersion;
private final int _amqpRevisionVersion;
+ private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
{
@@ -80,7 +84,8 @@ public class ConnectionMetaDataImpl implements ConnectionMetaData
public Enumeration getJMSXPropertyNames() throws JMSException
{
- return null; //TODO
+
+ return Collections.enumeration(_jmsxProperties);
}
public int getAMQPMajorVersion()
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index f214b43997..678a5b5d69 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -21,15 +21,17 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
-public class MessageConsumerImpl implements MessageConsumer
+public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
{
private String _selector;
private boolean _noLocal;
@@ -37,7 +39,9 @@ public class MessageConsumerImpl implements MessageConsumer
private SessionImpl _session;
private Receiver _receiver;
private Binary _lastUnackedMessage;
- private MessageListener _messageListener;
+ MessageListener _messageListener;
+
+ private boolean _closed = false;
MessageConsumerImpl(final Destination destination,
final SessionImpl session,
@@ -67,16 +71,19 @@ public class MessageConsumerImpl implements MessageConsumer
public String getMessageSelector() throws JMSException
{
+ checkClosed();
return _selector;
}
- public MessageListener getMessageListener()
+ public MessageListener getMessageListener() throws IllegalStateException
{
+ checkClosed();
return _messageListener;
}
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
+ checkClosed();
_messageListener = messageListener;
_session.messageListenerSet( this );
_receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
@@ -91,11 +98,13 @@ public class MessageConsumerImpl implements MessageConsumer
public MessageImpl receive() throws JMSException
{
+ checkClosed();
return receiveImpl(-1L);
}
public MessageImpl receive(final long timeout) throws JMSException
{
+ checkClosed();
// TODO - validate timeout > 0
return receiveImpl(timeout);
@@ -103,10 +112,11 @@ public class MessageConsumerImpl implements MessageConsumer
public MessageImpl receiveNoWait() throws JMSException
{
+ checkClosed();
return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout)
+ private MessageImpl receiveImpl(long timeout) throws IllegalStateException
{
org.apache.qpid.amqp_1_0.client.Message msg = receive0(timeout);
if(msg != null)
@@ -142,7 +152,21 @@ public class MessageConsumerImpl implements MessageConsumer
public void close() throws JMSException
{
- //TODO
+ if(!_closed)
+ {
+ _closed = true;
+
+ _receiver.close();
+
+ }
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Closed");
+ }
}
void setLastUnackedMessage(final Binary deliveryTag)
@@ -150,7 +174,7 @@ public class MessageConsumerImpl implements MessageConsumer
_lastUnackedMessage = deliveryTag;
}
- void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
+ void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException
{
final int acknowledgeMode = _session.getAcknowledgeMode();
@@ -173,19 +197,22 @@ public class MessageConsumerImpl implements MessageConsumer
}
}
- public DestinationImpl getDestination()
+ public DestinationImpl getDestination() throws IllegalStateException
{
+ checkClosed();
return _destination;
}
- public SessionImpl getSession()
+ public SessionImpl getSession() throws IllegalStateException
{
+ checkClosed();
return _session;
}
- public boolean getNoLocal()
+ public boolean getNoLocal() throws IllegalStateException
{
+ checkClosed();
return _noLocal;
}
@@ -193,4 +220,14 @@ public class MessageConsumerImpl implements MessageConsumer
{
_receiver.setCredit(UnsignedInteger.valueOf(100), true);
}
+
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ public Topic getTopic() throws JMSException
+ {
+ return (Topic) getDestination();
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
index e78ab4ffe1..9b61f75a1f 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
@@ -20,10 +20,15 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
import java.util.*;
class MessageFactory
@@ -95,7 +100,32 @@ class MessageFactory
}
else if(bodySection instanceof Data)
{
- message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+ if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+ {
+
+ Serializable serializable = null;
+ Binary data = ((Data) bodySection).getValue();
+
+ try
+ {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
+ serializable = (Serializable) ois.readObject();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ catch (ClassNotFoundException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
+ message = new ObjectMessageImpl(header, properties, appProperties, serializable, footer, _session);
+ }
+ else
+ {
+ message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+ }
}
else if(bodySection instanceof AmqpSequence)
{
@@ -152,6 +182,8 @@ class MessageFactory
message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
}
+ message.setReadOnly();
+
return message;
}
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
index b1c000dd63..cd4381b898 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -46,12 +46,17 @@ import java.util.*;
public abstract class MessageImpl implements Message
{
+ static final Set<Class> _supportedClasses =
+ new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
+ Float.class, Double.class, Character.class, String.class, byte[].class));
+
private Header _header;
private Properties _properties;
private ApplicationProperties _applicationProperties;
private Footer _footer;
public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
private SessionImpl _sessionImpl;
+ private boolean _readOnly;
protected MessageImpl(Header header,
Properties properties,
@@ -133,13 +138,13 @@ public abstract class MessageImpl implements Message
public void setJMSCorrelationID(String s) throws JMSException
{
- getProperties().setCorrelationId(new Binary(s.getBytes()));
+ getProperties().setCorrelationId(s == null ? null : new Binary(s.getBytes()));
}
public String getJMSCorrelationID() throws JMSException
{
final Binary id = (Binary) getProperties().getCorrelationId();
- return new String(id.getArray(), id.getArrayOffset(), id.getLength());
+ return id == null ? null : new String(id.getArray(), id.getArrayOffset(), id.getLength());
}
public DestinationImpl getJMSReplyTo() throws JMSException
@@ -253,12 +258,24 @@ public abstract class MessageImpl implements Message
public long getJMSExpiration() throws JMSException
{
- return 0; //TODO
+ final UnsignedInteger ttl = getTtl();
+ return ttl == null || ttl.longValue() == 0 ? 0 : getJMSTimestamp() + ttl.longValue();
}
public void setJMSExpiration(long l) throws JMSException
{
- //TODO
+ if(l == 0)
+ {
+ setTtl(UnsignedInteger.ZERO);
+ }
+ else
+ {
+ if(getTransmitTime() == null)
+ {
+ setTransmitTime(new Date());
+ }
+ setTtl(UnsignedInteger.valueOf(l - getTransmitTime().getDate()));
+ }
}
public int getJMSPriority() throws JMSException
@@ -281,7 +298,7 @@ public abstract class MessageImpl implements Message
public void clearProperties() throws JMSException
{
- //TODO
+ _applicationProperties.getValue().clear();
}
public boolean propertyExists(final String s) throws JMSException
@@ -674,53 +691,62 @@ public abstract class MessageImpl implements Message
public void setBooleanProperty(final String s, final boolean b) throws JMSException
{
+ checkWritable();
setBooleanProperty((Object)s, b);
}
public void setByteProperty(final String s, final byte b) throws JMSException
{
+ checkWritable();
setByteProperty((Object)s, b);
}
public void setShortProperty(final String s, final short i) throws JMSException
{
+ checkWritable();
setShortProperty((Object)s, i);
}
public void setIntProperty(final String s, final int i) throws JMSException
{
+ checkWritable();
setIntProperty((Object)s, i);
}
public void setLongProperty(final String s, final long l) throws JMSException
{
+ checkWritable();
setLongProperty((Object)s, l);
}
public void setFloatProperty(final String s, final float v) throws JMSException
{
+ checkWritable();
setFloatProperty((Object) s, v);
}
public void setDoubleProperty(final String s, final double v) throws JMSException
{
+ checkWritable();
setDoubleProperty((Object)s, v);
}
public void setStringProperty(final String s, final String s1) throws JMSException
{
+ checkWritable();
setStringProperty((Object)s, s1);
}
public void setObjectProperty(final String s, final Object o) throws JMSException
{
- if(o != null && (o.getClass().isPrimitive() || o instanceof String))
+ checkWritable();
+ if(o != null && (_supportedClasses.contains(o.getClass())))
{
setObjectProperty((Object)s, o);
}
else
{
- throw new JMSException("Cannot call setObjectProperty with a value of " + ((o == null) ? "null" : " class "+o.getClass().getName()) + ".");
+ throw new MessageFormatException("Cannot call setObjectProperty with a value of " + ((o == null) ? "null" : " class "+o.getClass().getName()) + ".");
}
}
@@ -958,12 +984,12 @@ public abstract class MessageImpl implements Message
public void clearBody() throws JMSException
{
- //TODO
+ _readOnly = false;
}
protected boolean isReadOnly()
{
- return false; //TODO
+ return _readOnly;
}
protected void checkReadable() throws MessageNotReadableException
@@ -982,6 +1008,11 @@ public abstract class MessageImpl implements Message
}
}
+ public void setReadOnly()
+ {
+ _readOnly = true;
+ }
+
private static class InvalidJMSMEssageIdException extends JMSException
{
public InvalidJMSMEssageIdException(String messageId)
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index 21629af7b1..bb91552b46 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -20,15 +20,17 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.UUID;
-public class MessageProducerImpl implements MessageProducer
+public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
private boolean _disableMessageID;
private boolean _disableMessageTimestamp;
@@ -39,6 +41,7 @@ public class MessageProducerImpl implements MessageProducer
private DestinationImpl _destination;
private SessionImpl _session;
private Sender _sender;
+ private boolean _closed;
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -67,64 +70,97 @@ public class MessageProducerImpl implements MessageProducer
}
}
- public boolean getDisableMessageID()
+ private void checkClosed() throws IllegalStateException
{
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Producer closed");
+ }
+ }
+
+ public boolean getDisableMessageID() throws IllegalStateException
+ {
+ checkClosed();
return _disableMessageID;
}
- public void setDisableMessageID(final boolean disableMessageID)
+ public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
{
+ checkClosed();
_disableMessageID = disableMessageID;
}
- public boolean getDisableMessageTimestamp()
+ public boolean getDisableMessageTimestamp() throws IllegalStateException
{
+ checkClosed();
return _disableMessageTimestamp;
}
- public void setDisableMessageTimestamp(final boolean disableMessageTimestamp)
+ public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
{
+ checkClosed();
_disableMessageTimestamp = disableMessageTimestamp;
}
- public int getDeliveryMode()
+ public int getDeliveryMode() throws IllegalStateException
{
+ checkClosed();
return _deliveryMode;
}
- public void setDeliveryMode(final int deliveryMode)
+ public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
{
+ checkClosed();
_deliveryMode = deliveryMode;
}
- public int getPriority()
+ public int getPriority() throws IllegalStateException
{
+ checkClosed();
return _priority;
}
- public void setPriority(final int priority)
+ public void setPriority(final int priority) throws IllegalStateException
{
+ checkClosed();
_priority = priority;
}
- public long getTimeToLive()
+ public long getTimeToLive() throws IllegalStateException
{
+ checkClosed();
return _timeToLive;
}
- public void setTimeToLive(final long timeToLive)
+ public void setTimeToLive(final long timeToLive) throws IllegalStateException
{
+ checkClosed();
_timeToLive = timeToLive;
}
public DestinationImpl getDestination() throws JMSException
{
+ checkClosed();
return _destination;
}
public void close() throws JMSException
{
- //TODO
+ try
+ {
+ if(!_closed)
+ {
+ _closed = true;
+ _sender.close();
+ }
+
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException("error closing");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
public void send(final Message message) throws JMSException
@@ -142,11 +178,22 @@ public class MessageProducerImpl implements MessageProducer
}
else
{
- msg = convertMessage(message);
+ msg = _session.convertMessage(message);
}
msg.setJMSDeliveryMode(deliveryMode);
msg.setJMSPriority(priority);
+
+ msg.setJMSDestination(_destination);
+
+ long timestamp = 0l;
+
+ if(!getDisableMessageTimestamp() && ttl==0)
+ {
+ timestamp = System.currentTimeMillis();
+ msg.setJMSTimestamp(timestamp);
+
+ }
if(ttl != 0)
{
msg.setTtl(UnsignedInteger.valueOf(ttl));
@@ -155,30 +202,49 @@ public class MessageProducerImpl implements MessageProducer
{
msg.setTtl(null);
}
- msg.setJMSDestination(_destination);
- if(!getDisableMessageTimestamp())
+
+ if(!getDisableMessageID() && msg.getMessageId() == null)
{
- msg.setJMSTimestamp(System.currentTimeMillis());
+ final Binary messageId = generateMessageId();
+ msg.setMessageId(messageId);
+
}
- if(!getDisableMessageID() && msg.getMessageId() == null)
+
+ if(message != msg)
{
- msg.setMessageId(generateMessageId());
+ message.setJMSTimestamp(msg.getJMSTimestamp());
+ message.setJMSMessageID(msg.getJMSMessageID());
+ message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
+ message.setJMSPriority(msg.getJMSPriority());
+ message.setJMSExpiration(msg.getJMSExpiration());
}
+
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
_sender.send(clientMessage);
+
+ if(getDestination() != null)
+ {
+ message.setJMSDestination(getDestination());
+ }
}
- private Binary generateMessageId()
+ public void send(final javax.jms.Queue queue, final Message message) throws JMSException
{
- UUID uuid = UUID.randomUUID();
- return new Binary(uuid.toString().getBytes());
+ send((Destination)queue, message);
+ }
+
+ public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send((Destination)queue, message, deliveryMode, priority, ttl);
}
- private MessageImpl convertMessage(final Message message)
+ private Binary generateMessageId()
{
- return null; //TODO
+ UUID uuid = UUID.randomUUID();
+ return new Binary(uuid.toString().getBytes());
}
public void send(final Destination destination, final Message message) throws JMSException
@@ -191,4 +257,35 @@ public class MessageProducerImpl implements MessageProducer
{
//TODO
}
+
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ public Topic getTopic() throws JMSException
+ {
+ return (Topic) getDestination();
+ }
+
+ public void publish(final Message message) throws JMSException
+ {
+ send(message);
+ }
+
+ public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+
+ public void publish(final Topic topic, final Message message) throws JMSException
+ {
+ send(topic, message);
+ }
+
+ public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
+ throws JMSException
+ {
+ send(topic, message, deliveryMode, priority, ttl);
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
index 1c95eb8579..35e48bab82 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
@@ -20,26 +20,34 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.MessageNotWriteableException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.*;
public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
{
+ static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
+
private Serializable _object;
protected ObjectMessageImpl(Header header,
Properties properties,
- Footer footer,
ApplicationProperties appProperties,
Serializable object,
+ Footer footer,
SessionImpl session)
{
super(header, properties, appProperties, footer, session);
+ getProperties().setContentType(CONTENT_TYPE);
_object = object;
}
@@ -47,6 +55,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
{
super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
+ getProperties().setContentType(CONTENT_TYPE);
}
public void setObject(final Serializable serializable) throws MessageNotWriteableException
@@ -67,8 +76,22 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
sections.add(getHeader());
sections.add(getProperties());
sections.add(getApplicationProperties());
- AmqpValue section = new AmqpValue(_object);
- sections.add(section);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(_object);
+ oos.flush();
+ oos.close();
+ sections.add(new Data(new Binary(baos.toByteArray())));
+
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+
sections.add(getFooter());
return sections;
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
index e82c2ce364..0106cf13fd 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
@@ -19,6 +19,7 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
import javax.jms.JMSException;
@@ -39,7 +40,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
}
- public QueueImpl getQueue() throws JMSException
+ public Queue getQueue() throws JMSException
{
return (QueueImpl) getDestination();
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index 01594a5a1c..f6378ceb47 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -20,28 +20,35 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.jms.TopicSession;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Topic;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
-public class SessionImpl implements Session
+public class SessionImpl implements Session, QueueSession, TopicSession
{
private ConnectionImpl _connection;
private AcknowledgeMode _acknowledgeMode;
private org.apache.qpid.amqp_1_0.client.Session _session;
private MessageFactory _messageFactory;
private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+ private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
+
private MessageListener _messageListener;
private Dispatcher _dispatcher = new Dispatcher();
private Thread _dispatcherThread;
+ private boolean _closed;
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
@@ -55,29 +62,33 @@ public class SessionImpl implements Session
_dispatcherThread.start();
}
- public BytesMessageImpl createBytesMessage() throws JMSException
+ public BytesMessageImpl createBytesMessage() throws IllegalStateException
{
+ checkClosed();
return new BytesMessageImpl(this);
}
public MapMessageImpl createMapMessage() throws JMSException
{
+ checkClosed();
return new MapMessageImpl(this);
}
- public MessageImpl createMessage() throws JMSException
+ public MessageImpl createMessage() throws IllegalStateException
{
return createAmqpMessage();
}
public ObjectMessageImpl createObjectMessage() throws JMSException
{
+ checkClosed();
return new ObjectMessageImpl(this);
}
public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
{
+ checkClosed();
ObjectMessageImpl msg = new ObjectMessageImpl(this);
msg.setObject(serializable);
return msg;
@@ -85,55 +96,81 @@ public class SessionImpl implements Session
public StreamMessageImpl createStreamMessage() throws JMSException
{
+ checkClosed();
return new StreamMessageImpl(this);
}
public TextMessageImpl createTextMessage() throws JMSException
{
+ checkClosed();
return new TextMessageImpl(this);
}
public TextMessageImpl createTextMessage(final String s) throws JMSException
{
+ checkClosed();
TextMessageImpl msg = new TextMessageImpl(this);
msg.setText(s);
return msg;
}
- public AmqpMessageImpl createAmqpMessage() throws JMSException
+ public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
{
+ checkClosed();
return new AmqpMessageImpl(this);
}
public boolean getTransacted() throws JMSException
{
+ checkClosed();
return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
}
- public int getAcknowledgeMode()
+ public int getAcknowledgeMode() throws IllegalStateException
{
+ checkClosed();
return _acknowledgeMode.ordinal();
}
public void commit() throws JMSException
{
+ checkClosed();
//TODO
}
public void rollback() throws JMSException
{
+ checkClosed();
//TODO
}
public void close() throws JMSException
{
- _dispatcher.close();
- _session.close();
+ if(!_closed)
+ {
+ _closed = true;
+ _dispatcher.close();
+ for(MessageConsumerImpl consumer : _consumers)
+ {
+ consumer.close();
+ }
+ for(MessageProducerImpl producer : _producers)
+ {
+ producer.close();
+ }
+ _session.close();
+ }
+ }
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ throw new IllegalStateException("Closed");
}
public void recover() throws JMSException
{
+ checkClosed();
//TODO
}
@@ -161,22 +198,31 @@ public class SessionImpl implements Session
public MessageProducerImpl createProducer(final Destination destination) throws JMSException
{
- return new MessageProducerImpl(destination, this);
+ checkClosed();
+
+ final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
+
+ _producers.add(messageProducer);
+
+ return messageProducer;
}
public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
{
+ checkClosed();
return createConsumer(destination, null, false);
}
public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
{
+ checkClosed();
return createConsumer(destination, selector, false);
}
public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
throws JMSException
{
+ checkClosed();
final MessageConsumerImpl messageConsumer;
synchronized(_session.getEndpoint().getLock())
{
@@ -197,47 +243,92 @@ public class SessionImpl implements Session
public QueueImpl createQueue(final String s) throws JMSException
{
+ checkClosed();
return new QueueImpl(s);
}
+ public QueueReceiver createReceiver(final Queue queue) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(queue);
+ }
+
+ public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(queue, selector);
+ }
+
+ public QueueSender createSender(final Queue queue) throws JMSException
+ {
+ checkClosed();
+ return createProducer(queue);
+ }
+
public TopicImpl createTopic(final String s) throws JMSException
{
+ checkClosed();
return new TopicImpl(s);
}
+ public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(topic);
+ }
+
+ public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
+ {
+ checkClosed();
+ return createConsumer(topic, selector, noLocal);
+ }
+
public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
{
+ checkClosed();
return createDurableSubscriber(topic, name, null, false);
}
public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
throws JMSException
{
+ checkClosed();
return null; //TODO
}
+ public TopicPublisher createPublisher(final Topic topic) throws JMSException
+ {
+ checkClosed();
+ return createProducer(topic);
+ }
+
public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
{
+ checkClosed();
return createBrowser(queue, null);
}
public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
{
+ checkClosed();
return null; //TODO
}
public TemporaryQueueImpl createTemporaryQueue() throws JMSException
{
+ checkClosed();
return null; //TODO
}
public TemporaryTopicImpl createTemporaryTopic() throws JMSException
{
+ checkClosed();
return null; //TODO
}
public void unsubscribe(final String s) throws JMSException
{
+ checkClosed();
//TODO
}
@@ -286,6 +377,133 @@ public class SessionImpl implements Session
_dispatcher.messageArrivedAtConsumer(messageConsumer);
}
+ MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
+ {
+ MessageImpl replacementMessage;
+
+ if(message instanceof BytesMessage)
+ {
+ replacementMessage = convertBytesMessage((BytesMessage) message);
+ }
+ else if(message instanceof MapMessage)
+ {
+ replacementMessage = convertMapMessage((MapMessage) message);
+ }
+ else if(message instanceof ObjectMessage)
+ {
+ replacementMessage = convertObjectMessage((ObjectMessage) message);
+ }
+ else if(message instanceof StreamMessage)
+ {
+ replacementMessage = convertStreamMessage((StreamMessage) message);
+ }
+ else if(message instanceof TextMessage)
+ {
+ replacementMessage = convertTextMessage((TextMessage) message);
+ }
+ else
+ {
+ replacementMessage = createMessage();
+ }
+
+ convertMessageProperties(message, replacementMessage);
+
+ return replacementMessage;
+ }
+
+
+ private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
+ throws JMSException
+ {
+ Enumeration propertyNames = message.getPropertyNames();
+ while (propertyNames.hasMoreElements())
+ {
+ String propertyName = String.valueOf(propertyNames.nextElement());
+ // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+ if (!propertyName.startsWith("JMSX_"))
+ {
+ Object value = message.getObjectProperty(propertyName);
+ replacementMessage.setObjectProperty(propertyName, value);
+ }
+ }
+
+
+ replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+ if (message.getJMSReplyTo() != null)
+ {
+ replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
+ }
+
+ replacementMessage.setJMSType(message.getJMSType());
+
+ replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+
+ private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
+ {
+ MapMessageImpl mapMessage = createMapMessage();
+
+ Enumeration mapNames = message.getMapNames();
+ while (mapNames.hasMoreElements())
+ {
+ String name = (String) mapNames.nextElement();
+ mapMessage.setObject(name, message.getObject(name));
+ }
+
+ return mapMessage;
+ }
+
+ private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
+ {
+ BytesMessageImpl bytesMessage = createBytesMessage();
+
+ message.reset();
+
+ byte[] buf = new byte[1024];
+
+ int len;
+
+ while ((len = message.readBytes(buf)) != -1)
+ {
+ bytesMessage.writeBytes(buf, 0, len);
+ }
+
+ return bytesMessage;
+ }
+
+ private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
+ {
+ ObjectMessageImpl objectMessage = createObjectMessage();
+ objectMessage.setObject(message.getObject());
+ return objectMessage;
+ }
+
+ private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
+ {
+ StreamMessageImpl streamMessage = createStreamMessage();
+
+ try
+ {
+ message.reset();
+ while (true)
+ {
+ streamMessage.writeObject(message.readObject());
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ // we're at the end so don't mind the exception
+ }
+
+ return streamMessage;
+ }
+
+ private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
+ {
+ return createTextMessage(message.getText());
+ }
+
private class Dispatcher implements Runnable
{
@@ -315,7 +533,7 @@ public class SessionImpl implements Session
while(_started && !_messageConsumerList.isEmpty())
{
MessageConsumerImpl consumer = _messageConsumerList.remove(0);
- MessageListener listener = consumer.getMessageListener();
+ MessageListener listener = consumer._messageListener;
Message msg = consumer.receive0(0L);
MessageImpl message = consumer.createJMSMessage(msg);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
index 50238460b2..b4f1e8c8e0 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
@@ -25,6 +25,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
import java.util.*;
public class StreamMessageImpl extends MessageImpl implements StreamMessage
@@ -34,6 +35,8 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
private int _position = -1;
private int _offset = -1;
+
+
protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
Footer footer, SessionImpl session)
{
@@ -59,52 +62,197 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public boolean readBoolean() throws JMSException
{
- return false; //TODO
+ Object obj = readObject();
+ if(obj instanceof Boolean)
+ {
+ return (Boolean) obj;
+ }
+ if(obj instanceof String || obj == null)
+ {
+ return Boolean.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _list.clear();
+ _position = -1;
+ _offset = -1;
}
public byte readByte() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Byte.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
}
public short readShort() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Short.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+
}
public char readChar() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Character)
+ {
+ return (Character) obj;
+ }
+ if(obj == null)
+ {
+ throw new NullPointerException();
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+ }
+
}
public int readInt() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Integer)
+ {
+ return (Integer) obj;
+ }
+ else if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Integer.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
}
public long readLong() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Long)
+ {
+ return (Long) obj;
+ }
+ else if(obj instanceof Integer)
+ {
+ return (Integer) obj;
+ }
+ else if(obj instanceof Short)
+ {
+ return (Short) obj;
+ }
+ else if(obj instanceof Byte)
+ {
+ return (Byte) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Long.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
}
public float readFloat() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Float)
+ {
+ return (Float) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Float.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
}
public double readDouble() throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(obj instanceof Double)
+ {
+ return (Double) obj;
+ }
+ else if(obj instanceof Float)
+ {
+ return (Float) obj;
+ }
+ else if(obj instanceof String || obj == null)
+ {
+ return Double.valueOf((String)obj);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
}
public String readString() throws JMSException
{
- return null; //TODO
+ Object obj = readObject();
+ if(obj instanceof byte[])
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ return String.valueOf(obj);
}
public int readBytes(final byte[] bytes) throws JMSException
{
- return 0; //TODO
+ Object obj = readObject();
+ if(!(obj instanceof byte[]))
+ {
+ throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+ }
+ return -1;
}
public Object readObject() throws JMSException
@@ -121,7 +269,7 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public void writeBoolean(final boolean b) throws JMSException
{
- //TODO
+ _list.add(b);
}
public void writeByte(final byte b) throws JMSException
@@ -176,12 +324,16 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage
public void writeObject(final Object o) throws JMSException
{
- //TODO
+ if(o == null || _supportedClasses.contains(o.getClass()))
+ {
+ _list.add(o);
+ }
}
public void reset() throws JMSException
{
- //TODO
+ _position = -1;
+ _offset = -1;
}
@Override Collection<Section> getSections()