diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java new file mode 100644 index 0000000000..0e53ef11bd --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -0,0 +1,243 @@ +package org.apache.qpid.server.protocol.converter.v0_8_v0_10; + +import java.nio.ByteBuffer; +import java.util.HashMap;import java.util.Map;import org.apache.qpid.AMQPInvalidClassException;import org.apache.qpid.exchange.ExchangeDefaults;import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable;import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange;import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties;import org.apache.qpid.transport.Header;import org.apache.qpid.transport.MessageDeliveryMode;import org.apache.qpid.transport.MessageProperties;import org.apache.qpid.transport.ReplyTo; + +public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage> +{ + private static final int BASIC_CLASS_ID = 60;public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage, + VirtualHost vhost) + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + Header header = messageTransferMessage.getHeader(); + DeliveryProperties deliveryProps = header.getDeliveryProperties(); + MessageProperties messageProps = header.getMessageProperties(); + + if(deliveryProps != null) + { + if(deliveryProps.hasDeliveryMode()) + { + props.setDeliveryMode((byte)(deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT)); + } + if(deliveryProps.hasExpiration()) + { + props.setExpiration(deliveryProps.getExpiration()); + } + if(deliveryProps.hasPriority()) + { + props.setPriority((byte)deliveryProps.getPriority().getValue()); + } + if(deliveryProps.hasTimestamp()) + { + props.setTimestamp(deliveryProps.getTimestamp()); + } + } + if(messageProps != null) + { + if(messageProps.hasAppId()) + { + props.setAppId(new AMQShortString(messageProps.getAppId())); + } + if(messageProps.hasContentType()) + { + props.setContentType(messageProps.getContentType()); + } + if(messageProps.hasCorrelationId()) + { + props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId())); + } + if(messageProps.hasContentEncoding()) + { + props.setEncoding(messageProps.getContentEncoding()); + } + if(messageProps.hasMessageId()) + { + props.setMessageId("ID:" + messageProps.getMessageId().toString()); + } + if(messageProps.hasReplyTo()) + { + ReplyTo replyTo = messageProps.getReplyTo(); + String exchangeName = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + if(exchangeName == null) + { + exchangeName = ""; + } + + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); + props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); + + } + if(messageProps.hasUserId()) + { + props.setUserId(new AMQShortString(messageProps.getUserId())); + } + + if(messageProps.hasApplicationHeaders()) + { + Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); + if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) + { + props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); + } + + FieldTable ft = new FieldTable(); + for(Map.Entry<String, Object> entry : appHeaders.entrySet()) + { + try + { + ft.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + catch(AMQPInvalidClassException e) + { + // TODO + // log here, but ignore - just can;t convert + } + } + props.setHeaders(ft); + + } + } + + return props; + } + + @Override + public Class<MessageTransferMessage> getInputClass() + { + return MessageTransferMessage.class; + } + + @Override + public Class<AMQMessage> getOutputClass() + { + return AMQMessage.class; + } + + @Override + public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost) + { + return new AMQMessage(convertToStoredMessage(message, vhost)); + } + + private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, VirtualHost vhost) + { + final MessageMetaData metaData = convertMetaData(message, vhost); + return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>() + { + @Override + public MessageMetaData getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return message.getContent(dst, offsetInMessage); + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return message.getContent(offsetInMessage,size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost) + { + return new MessageMetaData(convertPublishBody(message), convertContentHeaderBody(message, vhost), 1, message.getArrivalTime()); + } + + private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost) + { + BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost); + ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + chb.setBodySize(message.getSize()); + return chb; + } + + private MessagePublishInfo convertPublishBody(MessageTransferMessage message) + { + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + final AMQShortString exchangeName =(delvProps == null || delvProps.getExchange() == null) + ? null + : new AMQShortString(delvProps.getExchange()); + final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null) + ? null + : new AMQShortString(delvProps.getRoutingKey()); + final boolean immediate = delvProps != null && delvProps.getImmediate(); + final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable(); + + return new MessagePublishInfo() + { + @Override + public AMQShortString getExchange() + { + return exchangeName; + } + + @Override + public void setExchange(AMQShortString exchange) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isImmediate() + { + return immediate; + } + + @Override + public boolean isMandatory() + { + return mandatory; + } + + @Override + public AMQShortString getRoutingKey() + { + return routingKey; + } + }; + } +} |