summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java576
1 files changed, 576 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
new file mode 100644
index 0000000000..cec4268a7b
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -0,0 +1,576 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.collections.ReferenceMap;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
+{
+ private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+ public static final String JMS_TYPE = "x-jms-type";
+
+
+ private boolean _readableProperties = false;
+
+ private Destination _destination;
+ private JMSHeaderAdapter _headerAdapter;
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
+
+ private ContentHeaderProperties _contentHeaderProperties;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
+
+ // The base set of items that needs to be set.
+ private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_contentHeaderProperties != null);
+ _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+ : (new BasicContentHeaderProperties()).getHeaders() );
+ }
+
+ // Used for the creation of new messages
+ protected AMQMessageDelegate_0_8()
+ {
+ this(new BasicContentHeaderProperties(), -1);
+ _readableProperties = false;
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+
+ }
+
+ // Used when generating a received message object
+ protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+ AMQShortString routingKey)
+ {
+ this(contentHeader, deliveryTag);
+
+ Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+
+ AMQDestination dest = null;
+
+ // If we have a type set the attempt to use that.
+ if (type != null)
+ {
+ switch (type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ // Use the generateDestination method
+ dest = null;
+ }
+ }
+
+ if (dest == null)
+ {
+ dest = generateDestination(exchange, routingKey);
+ }
+
+ setJMSDestination(dest);
+ }
+
+
+
+ public String getJMSMessageID() throws JMSException
+ {
+ return getContentHeaderProperties().getMessageIdAsString();
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ if (messageId != null)
+ {
+ getContentHeaderProperties().setMessageId(messageId);
+ }
+ }
+
+ public void setJMSMessageID(UUID messageId) throws JMSException
+ {
+ if (messageId != null)
+ {
+ getContentHeaderProperties().setMessageId("ID:" + messageId);
+ }
+ }
+
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return getContentHeaderProperties().getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ getContentHeaderProperties().setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(new String(bytes));
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+ getContentHeaderProperties().setCorrelationId(correlationId);
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+ return getContentHeaderProperties().getCorrelationIdAsString();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException
+ {
+ String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
+ if (replyToEncoding == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+ if (dest == null)
+ {
+ try
+ {
+ BindingURL binding = new AMQBindingURL(replyToEncoding);
+ dest = AMQDestination.createDestination(binding);
+ }
+ catch (URISyntaxException e)
+ {
+ throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+ }
+
+ _destinationCache.put(replyToEncoding, dest);
+ }
+
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ getContentHeaderProperties().setReplyTo((String) null);
+ return; // We're done here
+ }
+
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ }
+
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final AMQShortString encodedDestination = amqd.getEncodedName();
+ _destinationCache.put(encodedDestination, destination);
+ getContentHeaderProperties().setReplyTo(encodedDestination);
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _destination;
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ _destination = destination;
+ }
+
+ public void setContentType(String contentType)
+ {
+ getContentHeaderProperties().setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return getContentHeaderProperties().getContentTypeAsString();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ getContentHeaderProperties().setEncoding(encoding);
+ }
+
+ public String getEncoding()
+ {
+ return getContentHeaderProperties().getEncodingAsString();
+ }
+
+ public String getReplyToString()
+ {
+ return getContentHeaderProperties().getReplyToAsString();
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+ return getContentHeaderProperties().getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException
+ {
+ getContentHeaderProperties().setDeliveryMode((byte) i);
+ }
+
+ public BasicContentHeaderProperties getContentHeaderProperties()
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+
+
+ public String getJMSType() throws JMSException
+ {
+ return getContentHeaderProperties().getTypeAsString();
+ }
+
+ public void setJMSType(String string) throws JMSException
+ {
+ getContentHeaderProperties().setType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return getContentHeaderProperties().getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ getContentHeaderProperties().setExpiration(l);
+ }
+
+
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().propertyExists(propertyName);
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getBoolean(propertyName);
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getByte(propertyName);
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getShort(propertyName);
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getInteger(propertyName);
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getLong(propertyName);
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getFloat(propertyName);
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getDouble(propertyName);
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ }
+ else
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ return getJmsHeaders().getString(propertyName);
+ }
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ return getJmsHeaders().getObject(propertyName);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return getJmsHeaders().getPropertyNames();
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setBoolean(propertyName, b);
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setByte(propertyName, new Byte(b));
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setShort(propertyName, new Short(i));
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setInteger(propertyName, new Integer(i));
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setLong(propertyName, new Long(l));
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setFloat(propertyName, new Float(f));
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
+
+ checkWritableProperties();
+ getJmsHeaders().setDouble(propertyName, new Double(v));
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setString(propertyName, value);
+ }
+
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
+ {
+ checkWritableProperties();
+ getJmsHeaders().setObject(propertyName, object);
+ }
+
+ public void removeProperty(String propertyName) throws JMSException
+ {
+ getJmsHeaders().remove(propertyName);
+ }
+
+
+ private JMSHeaderAdapter getJmsHeaders()
+ {
+ return _headerAdapter;
+ }
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ _contentHeaderProperties.updated();
+ }
+
+
+ public int getJMSPriority() throws JMSException
+ {
+ return getContentHeaderProperties().getPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ getContentHeaderProperties().setPriority((byte) i);
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ getJmsHeaders().clear();
+
+ _readableProperties = false;
+ }
+
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+}