summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java44
1 files changed, 38 insertions, 6 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
index cd4381b898..6eb150924c 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -33,6 +33,7 @@ import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.DeliveryMode;
@@ -49,6 +50,7 @@ public abstract class MessageImpl implements Message
static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
+ private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
private Header _header;
private Properties _properties;
@@ -57,8 +59,13 @@ public abstract class MessageImpl implements Message
public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
private SessionImpl _sessionImpl;
private boolean _readOnly;
+ private MessageAnnotations _messageAnnotations;
+
+ private boolean _isFromQueue;
+ private boolean _isFromTopic;
protected MessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
Footer footer,
@@ -66,6 +73,7 @@ public abstract class MessageImpl implements Message
{
_header = header == null ? new Header() : header;
_properties = properties == null ? new Properties() : properties;
+ _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations;
_footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer;
_applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties;
_sessionImpl = session;
@@ -170,7 +178,9 @@ public abstract class MessageImpl implements Message
public DestinationImpl getJMSDestination() throws JMSException
{
- return DestinationImpl.valueOf(getTo());
+ return _isFromQueue ? QueueImpl.valueOf(getTo())
+ : _isFromTopic ? TopicImpl.valueOf(getTo())
+ : DestinationImpl.valueOf(getTo());
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
@@ -240,20 +250,22 @@ public abstract class MessageImpl implements Message
public String getJMSType() throws JMSException
{
- final MessageAttributes messageAttrs = getHeaderMessageAttrs();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(Symbol.valueOf("apache.qpid.amqp_1_0-jms-type"));
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- MessageAttributes messageAttrs = getHeaderMessageAttrs();
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
if(messageAttrs == null)
{
- // TODO - Solve MessageAttrs problem
- messageAttrs = null;
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
}
+
+ messageAttrs.put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1056,10 +1068,30 @@ public abstract class MessageImpl implements Message
return _footer;
}
+ MessageAnnotations getMessageAnnotations()
+ {
+ return _messageAnnotations;
+ }
+
public ApplicationProperties getApplicationProperties()
{
return _applicationProperties;
}
+ public void reset() throws JMSException
+ {
+ _readOnly = true;
+ }
+
+ void setFromQueue(final boolean fromQueue)
+ {
+ _isFromQueue = fromQueue;
+ }
+
+ void setFromTopic(final boolean fromTopic)
+ {
+ _isFromTopic = fromTopic;
+ }
+
abstract Collection<Section> getSections();
}