diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:22:05 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-15 17:22:05 +0000 |
commit | 36d754c78ed94f696785ce168191165bde19ac88 (patch) | |
tree | 21b9fd6d2013e9916f71bcaac815e26b0a9685aa | |
parent | 03dc29735d9a20cc62df9aac4c1a9c6a197365ca (diff) | |
download | qpid-python-36d754c78ed94f696785ce168191165bde19ac88.tar.gz |
QPID-4027 Added an AMQP 0-10 message factory implementation. The
StringMessage is not really 0-10 specific. When we add AMQP 1.0 support,
perhaps we could have a AbstractMessageFactory which contains the common
stuff.
Added GenericMessageAdapter to be used as a base for all kinds of
Message adapters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350707 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 832 insertions, 80 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java new file mode 100644 index 0000000000..4928f1df38 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/GenericMessageAdapter.java @@ -0,0 +1,195 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.messaging.util; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageNotWritableException; +import org.apache.qpid.messaging.MessagingException; + +/** + * A generic message adapter that simply delegates + * all method calls to the underlying message delegate. + * This is not intended to be used by itself, + * rather as a base class for other adapters. For example, + * @see ReadOnlyMessageAdapter + * @see StringMessage_AMQP_0_10 + * @see MapMessage_AMQP_0_10 + */ +public class GenericMessageAdapter implements Message +{ + private Message _delegate; + + GenericMessageAdapter(Message delegate) + { + _delegate = delegate; + } + + @Override + public String getMessageId() throws MessagingException + { + return _delegate.getMessageId(); + } + + @Override + public void setMessageId(String messageId) throws MessagingException + { + _delegate.setMessageId(messageId); + } + + @Override + public String getSubject() throws MessagingException + { + return _delegate.getSubject(); + } + + @Override + public void setSubject(String subject) throws MessagingException + { + _delegate.setSubject(subject); + } + + @Override + public String getContentType() throws MessagingException + { + return _delegate.getContentType(); + } + + @Override + public void setContentType(String contentType) throws MessagingException + { + _delegate.setContentType(contentType); + } + + @Override + public String getCorrelationId() throws MessagingException + { + return _delegate.getCorrelationId(); + } + + @Override + public void setCorrelationId(String correlationId) throws MessagingException + { + _delegate.setCorrelationId(correlationId); + } + + @Override + public String getReplyTo() throws MessagingException + { + return _delegate.getReplyTo(); + } + + @Override + public void setReplyTo(String replyTo) throws MessagingException + { + _delegate.setReplyTo(replyTo); + } + + @Override + public String getUserId() throws MessagingException + { + return _delegate.getUserId(); + } + + @Override + public void setUserId(String userId) throws MessagingException + { + _delegate.setUserId(userId); + } + + @Override + public boolean isDurable() throws MessagingException + { + return _delegate.isDurable(); + } + + @Override + public void setDurable(boolean durable) throws MessagingException + { + _delegate.setDurable(durable); + } + + @Override + public boolean isRedelivered() throws MessagingException + { + return _delegate.isRedelivered(); + } + + @Override + public void setRedelivered(boolean redelivered) throws MessagingException + { + _delegate.setRedelivered(redelivered); + } + + @Override + public int getPriority() throws MessagingException + { + return _delegate.getPriority(); + } + + @Override + public void setPriority(int priority) throws MessagingException + { + _delegate.setPriority(priority); + } + + @Override + public long getTtl() throws MessagingException + { + return _delegate.getTtl(); + } + + @Override + public void setTtl(long ttl) throws MessagingException + { + _delegate.setTtl(ttl); + } + + @Override + public long getTimestamp() throws MessagingException + { + return _delegate.getTimestamp(); + } + + @Override + public void setTimestamp(long timestamp) throws MessagingException + { + _delegate.setTimestamp(timestamp); + } + + @Override + public Map<String, Object> getProperties() throws MessagingException + { + return _delegate.getProperties(); + } + + @Override + public void setProperty(String key, Object value) throws MessagingException + { + _delegate.setProperty(key, value); + } + + @Override + public ByteBuffer getContent() + { + return _delegate.getContent(); + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java new file mode 100644 index 0000000000..330d74cd53 --- /dev/null +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java @@ -0,0 +1,626 @@ +package org.apache.qpid.messaging.util; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.ListMessage; +import org.apache.qpid.messaging.MapMessage; +import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessageEncodingException; +import org.apache.qpid.messaging.MessageFactory; +import org.apache.qpid.messaging.MessagingException; +import org.apache.qpid.messaging.StringMessage; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; + +/** + * A generic message factory that is based on the AMQO 0-10 encoding. + * + */ +public class MessageFactory_AMQP_0_10 implements MessageFactory +{ + private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8"); + private static boolean ALLOCATE_DIRECT = Boolean.getBoolean("qpid.allocate-direct"); + private static final ByteBuffer EMPTY_BYTE_BUFFER = ALLOCATE_DIRECT ? ByteBuffer.allocateDirect(0) : ByteBuffer.allocate(0); + + @Override + public Message createMessage(String text) throws MessageEncodingException + { + return new StringMessage_AMQP_0_10(new Mesage_AMQP_0_10(), text); + } + + @Override + public Message createMessage(byte[] bytes) throws MessageEncodingException + { + ByteBuffer b; + if (ALLOCATE_DIRECT) + { + b = ByteBuffer.allocateDirect(bytes.length); + b.put(bytes); + } + else + { + b = ByteBuffer.wrap(bytes); + } + return new Mesage_AMQP_0_10(b); + } + + @Override + public Message createMessage(ByteBuffer buf) throws MessageEncodingException + { + if (ALLOCATE_DIRECT) + { + if (buf.isDirect()) + { + return new Mesage_AMQP_0_10(buf); + } + else + { + // Silently copying the data to a direct buffer is not a good thing as it can + // add a perf overhead. So an exception is a more reasonable option. + throw new MessageEncodingException("The ByteBuffer needs to be direct allocated"); + } + } + else + { + return new Mesage_AMQP_0_10(buf); + } + } + + @Override + public Message createMessage(Map<String, Object> map) throws MessageEncodingException + { + return new MapMessage_AMQP_0_10(new Mesage_AMQP_0_10(), map); + } + + @Override + public Message createMessage(List<Object> list) throws MessageEncodingException + { + return new ListMessage_AMQP_0_10(new Mesage_AMQP_0_10(), list); + } + + @Override + public String getContentAsString(Message m) throws MessageEncodingException + { + if (m instanceof StringMessage) + { + return ((StringMessage)m).getString(); + } + else + { + return decodeAsString(m.getContent()); + } + } + + @Override + public Map<String, Object> getContentAsMap(Message m) throws MessageEncodingException + { + if (m instanceof MapMessage) + { + return ((MapMessage)m).getMap(); + } + else + { + return decodeAsMap(m.getContent()); + } + } + + @Override + public List<Object> getContentAsList(Message m) throws MessageEncodingException + { + if (m instanceof ListMessage) + { + return ((ListMessage)m).getList(); + } + else + { + return decodeAsList(m.getContent()); + } + } + + class Mesage_AMQP_0_10 implements Message + { + private MessageProperties _messageProps; + private DeliveryProperties _deliveryProps; + private ByteBuffer _data; + + private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); + + protected Mesage_AMQP_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps) + { + this(messageProps, deliveryProps,EMPTY_BYTE_BUFFER); + } + protected Mesage_AMQP_0_10(MessageProperties messageProps, + DeliveryProperties deliveryProps, + ByteBuffer buf) + { + _messageProps = messageProps; + _deliveryProps = deliveryProps; + _data = buf; + } + + protected Mesage_AMQP_0_10() + { + _messageProps = new MessageProperties(); + _deliveryProps = new DeliveryProperties(); + } + + protected Mesage_AMQP_0_10(ByteBuffer buf) + { + _messageProps = new MessageProperties(); + _deliveryProps = new DeliveryProperties(); + _data = buf; + } + + @Override + public String getMessageId() throws MessagingException + { + return _messageProps.getMessageId().toString(); + } + + @Override + public void setMessageId(String messageId) throws MessagingException + { + // Temp hack for the time being + _messageProps.setMessageId(_ssnNameGenerator.generate()); + } + + @Override + public String getSubject() throws MessagingException + { + Map<String,Object> props = _messageProps.getApplicationHeaders(); + return props == null ? null : (String)props.get(QPID_SUBJECT); + } + + @Override + public void setSubject(String subject) throws MessagingException + { + Map<String,Object> props = _messageProps.getApplicationHeaders(); + if (props == null) + { + props = new HashMap<String,Object>(); + _messageProps.setApplicationHeaders(props); + } + props.put(QPID_SUBJECT, subject); + } + + @Override + public String getContentType() throws MessagingException + { + return _messageProps.getContentType(); + } + + @Override + public void setContentType(String contentType) + throws MessagingException + { + _messageProps.setContentType(contentType); + } + + @Override + public String getCorrelationId() throws MessagingException + { + return new String(_messageProps.getCorrelationId()); + } + + @Override + public void setCorrelationId(String correlationId) + throws MessagingException + { + _messageProps.setCorrelationId(correlationId.getBytes()); + } + + @Override + public String getReplyTo() throws MessagingException + { + return addressFrom0_10_ReplyTo(_deliveryProps.getExchange(), + _deliveryProps.getRoutingKey()); + } + + @Override + public void setReplyTo(String replyTo) throws MessagingException + { + // TODO + } + + @Override + public String getUserId() throws MessagingException + { + return new String(_messageProps.getUserId()); + } + + @Override + public void setUserId(String userId) throws MessagingException + { + _messageProps.setUserId(userId.getBytes()); + } + + @Override + public boolean isDurable() throws MessagingException + { + return _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT; + } + + @Override + public void setDurable(boolean durable) throws MessagingException + { + _deliveryProps.setDeliveryMode(durable ? + MessageDeliveryMode.PERSISTENT: MessageDeliveryMode.NON_PERSISTENT); + } + + @Override + public boolean isRedelivered() throws MessagingException + { + return _deliveryProps.getRedelivered(); + } + + @Override + public void setRedelivered(boolean redelivered) + throws MessagingException + { + + _deliveryProps.setRedelivered(redelivered); + } + + @Override + public int getPriority() throws MessagingException + { + return _deliveryProps.getPriority().getValue(); + } + + @Override + public void setPriority(int priority) throws MessagingException + { + _deliveryProps.setPriority(MessageDeliveryPriority.get((short)priority)); + + } + + @Override + public long getTtl() throws MessagingException + { + return _deliveryProps.getTtl(); + } + + @Override + public void setTtl(long ttl) throws MessagingException + { + _deliveryProps.setTtl(ttl); + } + + @Override + public long getTimestamp() throws MessagingException + { + return _deliveryProps.getTimestamp(); + } + + @Override + public void setTimestamp(long timestamp) throws MessagingException + { + _deliveryProps.setTimestamp(timestamp); + } + + @Override + public Map<String, Object> getProperties() throws MessagingException + { + return _messageProps.getApplicationHeaders(); + } + + @Override + public void setProperty(String key, Object value) + throws MessagingException + { + Map<String,Object> props = _messageProps.getApplicationHeaders(); + if (props == null) + { + props = new HashMap<String,Object>(); + _messageProps.setApplicationHeaders(props); + } + props.put(key, value); + } + + private String addressFrom0_10_ReplyTo(String exchange, String routingKey) + { + if ("".equals(exchange)) // type Queue + { + return routingKey; + } + else + { + return exchange + "/" + routingKey; + } + } + + @Override + public ByteBuffer getContent() + { + return _data; + } + } + + class StringMessage_AMQP_0_10 extends GenericMessageAdapter implements StringMessage + { + private String _str; + private ByteBuffer _rawData; + private MessageEncodingException _exception; + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + public StringMessage_AMQP_0_10(MessageProperties messageProps, + DeliveryProperties deliveryProps, + ByteBuffer data) + { + super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); + _rawData = (ByteBuffer) data.rewind(); + try + { + _str = decodeAsString(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public StringMessage_AMQP_0_10(Message delegate, String str) throws MessageEncodingException + { + super(delegate); + if(_str == null || _str.isEmpty()) + { + _rawData = EMPTY_BYTE_BUFFER; + } + else + { + _rawData = encodeString(str); + } + } + + @Override + public String getString() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _str; + } + } + + @Override + public ByteBuffer getContent() + { + return _rawData; + } + } + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + class MapMessage_AMQP_0_10 extends GenericMessageAdapter implements MapMessage + { + private Map<String,Object> _map; + private ByteBuffer _rawData; + private MessageEncodingException _exception; + + public MapMessage_AMQP_0_10(MessageProperties messageProps, + DeliveryProperties deliveryProps, + ByteBuffer data) + { + super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); + _rawData = (ByteBuffer) data.rewind(); + try + { + _map = decodeAsMap(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public MapMessage_AMQP_0_10(Message delegate, Map<String,Object> map) throws MessageEncodingException + { + super(delegate); + if(map == null || map.isEmpty()) + { + _rawData = EMPTY_BYTE_BUFFER; + } + else + { + _rawData = encodeMap(map); + } + } + + @Override + public Map<String,Object> getMap() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _map; + } + } + + @Override + public ByteBuffer getContent() + { + return _rawData; + } + } + + /** + * @param data The ByteBuffer passed will be read from position zero. + */ + class ListMessage_AMQP_0_10 extends GenericMessageAdapter implements ListMessage + { + private List<Object> _list; + private ByteBuffer _rawData; + private MessageEncodingException _exception; + + public ListMessage_AMQP_0_10(MessageProperties messageProps, + DeliveryProperties deliveryProps, + ByteBuffer data) + { + super(new Mesage_AMQP_0_10(messageProps, deliveryProps)); + _rawData = (ByteBuffer) data.rewind(); + try + { + _list = decodeAsList(_rawData.duplicate()); + } + catch (MessageEncodingException e) + { + _exception = e; + } + } + + public ListMessage_AMQP_0_10(Message delegate, List<Object> list) throws MessageEncodingException + { + super(delegate); + if(list == null || list.isEmpty()) + { + _rawData = EMPTY_BYTE_BUFFER; + } + else + { + _rawData = encodeList(list); + } + } + + @Override + public List<Object> getList() throws MessageEncodingException + { + if (_exception != null) + { + throw _exception; + } + else + { + return _list; + } + } + + @Override + public ByteBuffer getContent() + { + return _rawData; + } + } + + protected static String decodeAsString(ByteBuffer buf) throws MessageEncodingException + { + final CharsetDecoder decoder = DEFAULT_CHARSET.newDecoder(); + try + { + return decoder.decode(buf).toString(); + } + catch (CharacterCodingException e) + { + throw new MessageEncodingException("Error decoding content as String using UTF-8",e); + } + + } + + protected static ByteBuffer encodeString(String str) throws MessageEncodingException + { + final CharsetEncoder encoder = DEFAULT_CHARSET.newEncoder(); + ByteBuffer b; + try + { + b = encoder.encode(CharBuffer.wrap(str)); + b.flip(); + } + catch (CharacterCodingException e) + { + throw new MessageEncodingException("Cannot encode string in UFT-8: " + str,e); + } + if (ALLOCATE_DIRECT) + { + // unfortunately this extra copy is required as it does not seem possible + // to create a CharSetEncoder that returns a buffer allocated directly. + ByteBuffer direct = ByteBuffer.allocateDirect(b.remaining()); + direct.put(b); + direct.flip(); + return direct; + } + else + { + return b; + } + } + + protected static Map<String,Object> decodeAsMap(ByteBuffer buf) throws MessageEncodingException + { + try + { + BBDecoder decorder = new BBDecoder(); + decorder.init(buf); + return decorder.readMap(); + } + catch (Exception e) + { + throw new MessageEncodingException("Error decoding content as Map",e); + } + } + + protected static ByteBuffer encodeMap(Map<String,Object> map) throws MessageEncodingException + { + try + { + //need to investigate the capacity here. + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap(map); + return (ByteBuffer)encoder.buffer().flip(); + } + catch (Exception e) + { + throw new MessageEncodingException("Cannot encode Map" ,e); + } + } + + protected static List<Object> decodeAsList(ByteBuffer buf) throws MessageEncodingException + { + try + { + BBDecoder decorder = new BBDecoder(); + decorder.init(buf); + return decorder.readList(); + } + catch (Exception e) + { + throw new MessageEncodingException("Error decoding content as List",e); + } + } + + protected static ByteBuffer encodeList(List<Object> list) throws MessageEncodingException + { + try + { + //need to investigate the capacity here. + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(list); + return (ByteBuffer)encoder.buffer().flip(); + } + catch (Exception e) + { + throw new MessageEncodingException("Cannot encode List" ,e); + } + } +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java index d0c359a592..1c93680a7f 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReadOnlyMessageAdapter.java @@ -17,6 +17,7 @@ */ package org.apache.qpid.messaging.util; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; @@ -25,28 +26,13 @@ import org.apache.qpid.messaging.MessageNotWritableException; import org.apache.qpid.messaging.MessagingException; /** - * Ensures the message is read only by blocking the delegates - * setter methods. + * Ensures the message properties and content are read only. */ -public class ReadOnlyMessageAdapter implements Message +public class ReadOnlyMessageAdapter extends GenericMessageAdapter { - private Message _delegate; - ReadOnlyMessageAdapter(Message delegate) { - _delegate = delegate; - } - - @Override - public Object getContent() throws MessagingException - { - return _delegate.getContent(); - } - - @Override - public String getMessageId() throws MessagingException - { - return _delegate.getMessageId(); + super(delegate); } @Override @@ -56,120 +42,60 @@ public class ReadOnlyMessageAdapter implements Message } @Override - public String getSubject() throws MessagingException - { - return _delegate.getSubject(); - } - - @Override public void setSubject(String subject) throws MessagingException { throwMessageNotWritableException(); } @Override - public String getContentType() throws MessagingException - { - return _delegate.getContentType(); - } - - @Override public void setContentType(String contentType) throws MessagingException { throwMessageNotWritableException(); } @Override - public String getCorrelationId() throws MessagingException - { - return _delegate.getCorrelationId(); - } - - @Override public void setCorrelationId(String correlationId) throws MessagingException { throwMessageNotWritableException(); } @Override - public String getReplyTo() throws MessagingException - { - return _delegate.getReplyTo(); - } - - @Override public void setReplyTo(String replyTo) throws MessagingException { throwMessageNotWritableException(); } @Override - public String getUserId() throws MessagingException - { - return _delegate.getUserId(); - } - - @Override public void setUserId(String userId) throws MessagingException { throwMessageNotWritableException(); } @Override - public boolean isDurable() throws MessagingException - { - return _delegate.isDurable(); - } - - @Override public void setDurable(boolean durable) throws MessagingException { throwMessageNotWritableException(); } @Override - public boolean isRedelivered() throws MessagingException - { - return _delegate.isRedelivered(); - } - - @Override public void setRedelivered(boolean redelivered) throws MessagingException { throwMessageNotWritableException(); } @Override - public int getPriority() throws MessagingException - { - return _delegate.getPriority(); - } - - @Override public void setPriority(int priority) throws MessagingException { throwMessageNotWritableException(); } @Override - public long getTtl() throws MessagingException - { - return _delegate.getTtl(); - } - - @Override public void setTtl(long ttl) throws MessagingException { throwMessageNotWritableException(); } @Override - public long getTimestamp() throws MessagingException - { - return _delegate.getTimestamp(); - } - - @Override public void setTimestamp(long timestamp) throws MessagingException { throwMessageNotWritableException(); @@ -178,7 +104,7 @@ public class ReadOnlyMessageAdapter implements Message @Override public Map<String, Object> getProperties() throws MessagingException { - return Collections.unmodifiableMap(_delegate.getProperties()); + return Collections.unmodifiableMap(super.getProperties()); } @Override @@ -187,9 +113,14 @@ public class ReadOnlyMessageAdapter implements Message throwMessageNotWritableException(); } + @Override + public ByteBuffer getContent() + { + return super.getContent().asReadOnlyBuffer(); + } + private void throwMessageNotWritableException() throws MessageNotWritableException { throw new MessageNotWritableException("Message is read-only"); } - } |