summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java243
1 files changed, 243 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
new file mode 100644
index 0000000000..0e53ef11bd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -0,0 +1,243 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;import java.util.Map;import org.apache.qpid.AMQPInvalidClassException;import org.apache.qpid.exchange.ExchangeDefaults;import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.Exchange;import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;import org.apache.qpid.transport.Header;import org.apache.qpid.transport.MessageDeliveryMode;import org.apache.qpid.transport.MessageProperties;import org.apache.qpid.transport.ReplyTo;
+
+public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage,
+ VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ Header header = messageTransferMessage.getHeader();
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
+ MessageProperties messageProps = header.getMessageProperties();
+
+ if(deliveryProps != null)
+ {
+ if(deliveryProps.hasDeliveryMode())
+ {
+ props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT));
+ }
+ if(deliveryProps.hasExpiration())
+ {
+ props.setExpiration(deliveryProps.getExpiration());
+ }
+ if(deliveryProps.hasPriority())
+ {
+ props.setPriority((byte)deliveryProps.getPriority().getValue());
+ }
+ if(deliveryProps.hasTimestamp())
+ {
+ props.setTimestamp(deliveryProps.getTimestamp());
+ }
+ }
+ if(messageProps != null)
+ {
+ if(messageProps.hasAppId())
+ {
+ props.setAppId(new AMQShortString(messageProps.getAppId()));
+ }
+ if(messageProps.hasContentType())
+ {
+ props.setContentType(messageProps.getContentType());
+ }
+ if(messageProps.hasCorrelationId())
+ {
+ props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ }
+ if(messageProps.hasContentEncoding())
+ {
+ props.setEncoding(messageProps.getContentEncoding());
+ }
+ if(messageProps.hasMessageId())
+ {
+ props.setMessageId("ID:" + messageProps.getMessageId().toString());
+ }
+ if(messageProps.hasReplyTo())
+ {
+ ReplyTo replyTo = messageProps.getReplyTo();
+ String exchangeName = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
+
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
+ props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
+
+ }
+ if(messageProps.hasUserId())
+ {
+ props.setUserId(new AMQShortString(messageProps.getUserId()));
+ }
+
+ if(messageProps.hasApplicationHeaders())
+ {
+ Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+ if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+ {
+ props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+ }
+
+ FieldTable ft = new FieldTable();
+ for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+ {
+ try
+ {
+ ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ catch(AMQPInvalidClassException e)
+ {
+ // TODO
+ // log here, but ignore - just can;t convert
+ }
+ }
+ props.setHeaders(ft);
+
+ }
+ }
+
+ return props;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(message, vhost));
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, VirtualHost vhost)
+ {
+ final MessageMetaData metaData = convertMetaData(message, vhost);
+ return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return message.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return message.getContent(offsetInMessage,size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new MessageMetaData(convertPublishBody(message), convertContentHeaderBody(message, vhost), 1, message.getArrivalTime());
+ }
+
+ private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ chb.setBodySize(message.getSize());
+ return chb;
+ }
+
+ private MessagePublishInfo convertPublishBody(MessageTransferMessage message)
+ {
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ final AMQShortString exchangeName =(delvProps == null || delvProps.getExchange() == null)
+ ? null
+ : new AMQShortString(delvProps.getExchange());
+ final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null)
+ ? null
+ : new AMQShortString(delvProps.getRoutingKey());
+ final boolean immediate = delvProps != null && delvProps.getImmediate();
+ final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
+
+ return new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return exchangeName;
+ }
+
+ @Override
+ public void setExchange(AMQShortString exchange)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+ }
+}