diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache')
3 files changed, 29 insertions, 10 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index bceae85896..918a890af5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -112,10 +112,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // TODO + long size = entry.getMessage().getSize(); send(entry); + return size; } public void flushBatched() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 5b9bdc7244..3572b98cad 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -32,6 +32,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; @@ -43,6 +44,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.apache.qpid.util.GZIPUtils; public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> { @@ -202,7 +204,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement SectionEncoder sectionEncoder) { final String mimeType = serverMessage.getMessageHeader().getMimeType(); - Section bodySection = getBodySection(serverMessage, mimeType); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + byte[] uncompressed; + + if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding()) + && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null) + { + data = uncompressed; + metaData.getPropertiesSection().setContentEncoding(null); + } + + + Section bodySection = convertMessageBody(mimeType, data); final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); @@ -279,14 +293,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement }; } - protected Section getBodySection(final M serverMessage, final String mimeType) - { - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - return convertMessageBody(mimeType, data); - } - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) { int headerSize = (int) metaData.getStorableSize(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 4540308f61..fbc24ba454 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -72,6 +72,17 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData this(sections, encodeSections(sections, encoder)); } + public Properties getPropertiesSection() + { + return _properties; + } + + + public Header getHeaderSection() + { + return _header; + } + private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) { ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); |