summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java47
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java22
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java21
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java121
7 files changed, 174 insertions, 53 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 1c264e52c6..c193491e1e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -103,7 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private String _clientProduct = null;
private String _remoteProcessPid = null;
- private VirtualHostImpl _virtualHost;
+ private VirtualHostImpl<?,?,?> _virtualHost;
private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
@@ -175,6 +175,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private volatile boolean _stopped;
private long _readBytes;
private boolean _authenticated;
+ private boolean _compressionSupported;
+ private int _messageCompressionThreshold;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -208,7 +210,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return null;
}
});
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
@@ -539,6 +541,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_broker.getName());
serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
String.valueOf(_closeWhenNoRoute));
+ serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
+ String.valueOf(_broker.isMessageCompressionEnabled()));
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -1131,6 +1135,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this);
}
}
+ String compressionSupported = clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
+ if (compressionSupported != null)
+ {
+ _compressionSupported = Boolean.parseBoolean(compressionSupported);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Client set compressionSupported=" + _compressionSupported + " for protocol engine " + this);
+ }
+ }
_clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
_clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
@@ -1181,17 +1194,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getMethodRegistry();
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
{
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
+
+ _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
+ Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ if(_messageCompressionThreshold <= 0)
+ {
+ _messageCompressionThreshold = Integer.MAX_VALUE;
+ }
}
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
@@ -1595,15 +1615,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
- registerMessageDelivered(message.getSize());
- _protocolOutputConverter.writeDeliver(message,
+ long size = _protocolOutputConverter.writeDeliver(message,
props,
_channelId,
deliveryTag,
new AMQShortString(sub.getName()));
+ registerMessageDelivered(size);
+ return size;
}
}
@@ -1636,6 +1657,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _closeWhenNoRoute;
}
+ @Override
+ public boolean isCompressionSupported()
+ {
+ return _compressionSupported && _broker.isMessageCompressionEnabled();
+ }
+
+ @Override
+ public int getMessageCompressionThreshold()
+ {
+ return _messageCompressionThreshold;
+ }
+
public EventLogger getEventLogger()
{
if(_virtualHost != null)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index bab0aaf3da..8d5142338a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -174,9 +174,9 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
Object getReference();
- VirtualHostImpl getVirtualHost();
+ VirtualHostImpl<?,?,?> getVirtualHost();
- void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException;
+ void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException;
public ProtocolOutputConverter getProtocolOutputConverter();
@@ -210,4 +210,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
* can't be routed rather than returning the message.
*/
boolean isCloseWhenNoRoute();
+
+ boolean isCompressionSupported();
+
+ int getMessageCompressionThreshold();
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index fa26a73f93..c7871e8b9a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -26,6 +26,6 @@ import org.apache.qpid.server.message.ServerMessage;
public interface ClientDeliveryMethod
{
- void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
+ long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 7c2efe64e6..d5eed242e7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -116,7 +116,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -194,17 +194,17 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
MessageReference ref = message.newReference();
InstanceProperties props = entry.getInstanceProperties();
entry.delete();
-
+ long size;
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(message, props, deliveryTag);
+ size = sendToClient(message, props, deliveryTag);
}
ref.release();
-
+ return size;
}
@@ -291,7 +291,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
@@ -303,9 +303,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
entry.addStateChangeListener(getReleasedStateChangeListener());
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
-
+ return size;
}
}
@@ -502,9 +502,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
{
- _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+ return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 0026bad063..c3bdedf44d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.EnumSet;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -30,26 +33,23 @@ import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-import java.util.EnumSet;
-
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -202,17 +202,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
@Override
- public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
- _session.getProtocolOutputConverter().writeGetOk(message,
+ long size =_session.getProtocolOutputConverter().writeGetOk(message,
props,
_channel.getChannelId(),
deliveryTag,
_queue.getQueueDepthMessages());
_deliveredMessage = true;
+ return size;
}
public boolean hasDeliveredMessage()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
index 7678ce812b..4ee5cbc17d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
@@ -26,7 +26,6 @@
*/
package org.apache.qpid.server.protocol.v0_8.output;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -35,7 +34,6 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
public interface ProtocolOutputConverter
{
@@ -46,12 +44,12 @@ public interface ProtocolOutputConverter
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(final ServerMessage msg,
+ long writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag);
- void writeGetOk(final ServerMessage msg,
+ long writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index f786cb113a..9e41f7884c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.output;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -27,6 +31,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetOkBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -34,16 +39,13 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpid.util.GZIPUtils;
class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
@@ -51,6 +53,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private final MethodRegistry _methodRegistry;
private final AMQProtocolSession _protocolSession;
+ private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry)
{
@@ -64,7 +67,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(final ServerMessage m,
+ public long writeDeliver(final ServerMessage m,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag)
@@ -72,7 +75,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQMessage msg = convertToAMQMessage(m);
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
- writeMessageDelivery(msg, channelId, deliverBody);
+ return writeMessageDelivery(msg, channelId, deliverBody);
}
private AMQMessage convertToAMQMessage(ServerMessage serverMessage)
@@ -93,21 +96,97 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return MessageConverterRegistry.getConverter(clazz, AMQMessage.class);
}
- private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
+ private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
{
- writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
+ return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
{
-
int bodySize = (int) message.getSize();
+ boolean msgCompressed = isCompressed(contentHeaderBody);
+ byte[] modifiedContent;
+
+ // straight through case
+ boolean compressionSupported = _protocolSession.isCompressionSupported();
+
+ if(msgCompressed && !compressionSupported &&
+ (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null)
+ {
+ BasicContentHeaderProperties modifiedProps =
+ new BasicContentHeaderProperties(contentHeaderBody.getProperties());
+ modifiedProps.setEncoding((String)null);
+
+ writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
+
+ return modifiedContent.length;
+ }
+ else if(!msgCompressed
+ && compressionSupported
+ && contentHeaderBody.getProperties().getEncoding()==null
+ && bodySize > _protocolSession.getMessageCompressionThreshold()
+ && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null)
+ {
+ BasicContentHeaderProperties modifiedProps =
+ new BasicContentHeaderProperties(contentHeaderBody.getProperties());
+ modifiedProps.setEncoding(GZIP_ENCODING);
+
+ writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
+
+ return modifiedContent.length;
+ }
+ else
+ {
+ writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize);
+
+ return bodySize;
+ }
+ }
- if(bodySize == 0)
+ private int writeMessageDeliveryModified(final int channelId,
+ final AMQBody deliverBody,
+ final BasicContentHeaderProperties modifiedProps,
+ final byte[] content)
+ {
+ final int bodySize;
+ bodySize = content.length;
+ ContentHeaderBody modifiedHeaderBody =
+ new ContentHeaderBody(BASIC_CLASS_ID, 0, modifiedProps, bodySize);
+ final MessageContentSource wrappedSource = new MessageContentSource()
+ {
+ @Override
+ public int getContent(final ByteBuffer buf, final int offset)
+ {
+ int size = Math.min(buf.remaining(), content.length - offset);
+ buf.put(content, offset, size);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(final int offset, final int size)
+ {
+ return ByteBuffer.wrap(content, offset, size);
+ }
+
+ @Override
+ public long getSize()
+ {
+ return content.length;
+ }
+ };
+ writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize);
+ return bodySize;
+ }
+
+ private void writeMessageDeliveryUnchanged(final MessageContentSource message,
+ final ContentHeaderBody contentHeaderBody,
+ final int channelId, final AMQBody deliverBody, final int bodySize)
+ {
+ if (bodySize == 0)
{
SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -120,13 +199,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
int writtenSize = capacity;
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+ AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity);
CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ compositeBlock =
+ new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
- while(writtenSize < bodySize)
+ while (writtenSize < bodySize)
{
capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
@@ -137,6 +217,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
+ private boolean isCompressed(final ContentHeaderBody contentHeaderBody)
+ {
+ return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
+ }
+
private class MessageContentSourceBody implements AMQBody
{
public static final byte TYPE = 3;
@@ -186,14 +271,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- public void writeGetOk(final ServerMessage msg,
+ public long writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
int queueSize)
{
AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
- writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
+ return writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
}