diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java')
7 files changed, 174 insertions, 53 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 1c264e52c6..c193491e1e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -103,7 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private String _clientProduct = null; private String _remoteProcessPid = null; - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); @@ -175,6 +175,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _stopped; private long _readBytes; private boolean _authenticated; + private boolean _compressionSupported; + private int _messageCompressionThreshold; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -208,7 +210,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return null; } }); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); @@ -539,6 +541,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _broker.getName()); serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, String.valueOf(_closeWhenNoRoute)); + serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, + String.valueOf(_broker.isMessageCompressionEnabled())); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -1131,6 +1135,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this); } } + String compressionSupported = clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(compressionSupported); + if(_logger.isDebugEnabled()) + { + _logger.debug("Client set compressionSupported=" + _compressionSupported + " for protocol engine " + this); + } + } _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); @@ -1181,17 +1194,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getMethodRegistry(); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException { _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); + + _messageCompressionThreshold = virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } public void addDeleteTask(Action<? super AMQProtocolEngine> task) @@ -1595,15 +1615,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - registerMessageDelivered(message.getSize()); - _protocolOutputConverter.writeDeliver(message, + long size = _protocolOutputConverter.writeDeliver(message, props, _channelId, deliveryTag, new AMQShortString(sub.getName())); + registerMessageDelivered(size); + return size; } } @@ -1636,6 +1657,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closeWhenNoRoute; } + @Override + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } + + @Override + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } + public EventLogger getEventLogger() { if(_virtualHost != null) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index bab0aaf3da..8d5142338a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -174,9 +174,9 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> Object getReference(); - VirtualHostImpl getVirtualHost(); + VirtualHostImpl<?,?,?> getVirtualHost(); - void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException; + void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException; public ProtocolOutputConverter getProtocolOutputConverter(); @@ -210,4 +210,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> * can't be routed rather than returning the message. */ boolean isCloseWhenNoRoute(); + + boolean isCompressionSupported(); + + int getMessageCompressionThreshold(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index fa26a73f93..c7871e8b9a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -26,6 +26,6 @@ import org.apache.qpid.server.message.ServerMessage; public interface ClientDeliveryMethod { - void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, + long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 7c2efe64e6..d5eed242e7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -116,7 +116,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -194,17 +194,17 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen MessageReference ref = message.newReference(); InstanceProperties props = entry.getInstanceProperties(); entry.delete(); - + long size; synchronized (getChannel()) { getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(message, props, deliveryTag); + size = sendToClient(message, props, deliveryTag); } ref.release(); - + return size; } @@ -291,7 +291,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { @@ -303,9 +303,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen addUnacknowledgedMessage(entry); recordMessageDelivery(entry, deliveryTag); entry.addStateChangeListener(getReleasedStateChangeListener()); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - + return size; } } @@ -502,9 +502,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 0026bad063..c3bdedf44d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.EnumSet; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -30,26 +33,23 @@ import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; -import java.util.EnumSet; - public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -202,17 +202,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); - _session.getProtocolOutputConverter().writeGetOk(message, + long size =_session.getProtocolOutputConverter().writeGetOk(message, props, _channel.getChannelId(), deliveryTag, _queue.getQueueDepthMessages()); _deliveredMessage = true; + return size; } public boolean hasDeliveredMessage() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index 7678ce812b..4ee5cbc17d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -26,7 +26,6 @@ */ package org.apache.qpid.server.protocol.v0_8.output; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -35,7 +34,6 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; public interface ProtocolOutputConverter { @@ -46,12 +44,12 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session); } - void writeDeliver(final ServerMessage msg, + long writeDeliver(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag); - void writeGetOk(final ServerMessage msg, + long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index f786cb113a..9e41f7884c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.output; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -27,6 +31,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicGetOkBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -34,16 +39,13 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpid.util.GZIPUtils; class ProtocolOutputConverterImpl implements ProtocolOutputConverter { @@ -51,6 +53,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter private final MethodRegistry _methodRegistry; private final AMQProtocolSession _protocolSession; + private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry) { @@ -64,7 +67,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession; } - public void writeDeliver(final ServerMessage m, + public long writeDeliver(final ServerMessage m, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag) @@ -72,7 +75,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQMessage msg = convertToAMQMessage(m); final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag); - writeMessageDelivery(msg, channelId, deliverBody); + return writeMessageDelivery(msg, channelId, deliverBody); } private AMQMessage convertToAMQMessage(ServerMessage serverMessage) @@ -93,21 +96,97 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return MessageConverterRegistry.getConverter(clazz, AMQMessage.class); } - private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) { - writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); + return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); } - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) { - int bodySize = (int) message.getSize(); + boolean msgCompressed = isCompressed(contentHeaderBody); + byte[] modifiedContent; + + // straight through case + boolean compressionSupported = _protocolSession.isCompressionSupported(); + + if(msgCompressed && !compressionSupported && + (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding((String)null); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else if(!msgCompressed + && compressionSupported + && contentHeaderBody.getProperties().getEncoding()==null + && bodySize > _protocolSession.getMessageCompressionThreshold() + && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding(GZIP_ENCODING); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else + { + writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize); + + return bodySize; + } + } - if(bodySize == 0) + private int writeMessageDeliveryModified(final int channelId, + final AMQBody deliverBody, + final BasicContentHeaderProperties modifiedProps, + final byte[] content) + { + final int bodySize; + bodySize = content.length; + ContentHeaderBody modifiedHeaderBody = + new ContentHeaderBody(BASIC_CLASS_ID, 0, modifiedProps, bodySize); + final MessageContentSource wrappedSource = new MessageContentSource() + { + @Override + public int getContent(final ByteBuffer buf, final int offset) + { + int size = Math.min(buf.remaining(), content.length - offset); + buf.put(content, offset, size); + return size; + } + + @Override + public ByteBuffer getContent(final int offset, final int size) + { + return ByteBuffer.wrap(content, offset, size); + } + + @Override + public long getSize() + { + return content.length; + } + }; + writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize); + return bodySize; + } + + private void writeMessageDeliveryUnchanged(final MessageContentSource message, + final ContentHeaderBody contentHeaderBody, + final int channelId, final AMQBody deliverBody, final int bodySize) + { + if (bodySize == 0) { SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); + contentHeaderBody); writeFrame(compositeBlock); } @@ -120,13 +199,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter int writtenSize = capacity; - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity); CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + compositeBlock = + new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); writeFrame(compositeBlock); - while(writtenSize < bodySize) + while (writtenSize < bodySize) { capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); @@ -137,6 +217,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + private boolean isCompressed(final ContentHeaderBody contentHeaderBody) + { + return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding()); + } + private class MessageContentSourceBody implements AMQBody { public static final byte TYPE = 3; @@ -186,14 +271,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - public void writeGetOk(final ServerMessage msg, + public long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, int queueSize) { AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize); - writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); + return writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); } |