summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java24
-rwxr-xr-xqpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java11
3 files changed, 29 insertions, 10 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index bceae85896..918a890af5 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -112,10 +112,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// TODO
+ long size = entry.getMessage().getSize();
send(entry);
+ return size;
}
public void flushBatched()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 5b9bdc7244..3572b98cad 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -32,6 +32,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.apache.qpid.util.GZIPUtils;
public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0>
{
@@ -202,7 +204,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
SectionEncoder sectionEncoder)
{
final String mimeType = serverMessage.getMessageHeader().getMimeType();
- Section bodySection = getBodySection(serverMessage, mimeType);
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+ byte[] uncompressed;
+
+ if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding())
+ && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null)
+ {
+ data = uncompressed;
+ metaData.getPropertiesSection().setContentEncoding(null);
+ }
+
+
+ Section bodySection = convertMessageBody(mimeType, data);
final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
@@ -279,14 +293,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
};
}
- protected Section getBodySection(final M serverMessage, final String mimeType)
- {
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
- return convertMessageBody(mimeType, data);
- }
-
private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
{
int headerSize = (int) metaData.getStorableSize();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 4540308f61..fbc24ba454 100755
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -72,6 +72,17 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
this(sections, encodeSections(sections, encoder));
}
+ public Properties getPropertiesSection()
+ {
+ return _properties;
+ }
+
+
+ public Header getHeaderSection()
+ {
+ return _header;
+ }
+
private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
{
ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());