diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src')
3 files changed, 75 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 7ab3fbb1f5..ec0c38ec42 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +52,7 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; +import org.apache.qpid.util.GZIPUtils; public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { @@ -198,7 +200,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public void send(final MessageInstance entry, boolean batch) + public long send(final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -264,11 +266,44 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC deliveryProps.setRedelivered(entry.isRedelivered()); - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding()); + + ByteBuffer body = msg.getBody(); - xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported(); + + if(msgCompressed && !compressionSupported) + { + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body); + if(uncompressed != null) + { + messageProps.setContentEncoding(null); + body = ByteBuffer.wrap(uncompressed); + } + } + else if(!msgCompressed + && compressionSupported + && (messageProps == null || messageProps.getContentEncoding()==null) + && body.remaining() > _session.getConnection().getMessageCompressionThreshold()) + { + byte[] compressed = GZIPUtils.compressBufferToArray(body); + if(compressed != null) + { + if(messageProps == null) + { + messageProps = new MessageProperties(); + } + messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); + body = ByteBuffer.wrap(compressed); + } + } + long size = body == null ? 0 : body.remaining(); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { @@ -311,7 +346,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - + return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8ddd04f51a..60bb5c6112 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -74,7 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; private final Object _reference = new Object(); - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; @@ -87,6 +87,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S new CopyOnWriteArrayList<SessionModelListener>(); private volatile boolean _stopped; + private int _messageCompressionThreshold; public ServerConnection(final long connectionId, Broker broker) { @@ -172,14 +173,22 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; + _messageCompressionThreshold = + virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } @Override @@ -639,4 +648,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _taskList.remove(task); } + + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index bab2d802e8..cc9d66756b 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -64,6 +64,8 @@ public class ServerConnectionDelegate extends ServerDelegate private final SubjectCreator _subjectCreator; private int _maximumFrameSize; + private boolean _compressionSupported; + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); @@ -111,6 +113,7 @@ public class ServerConnectionDelegate extends ServerDelegate map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion()); map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion()); map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName()); + map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled())); return map; } @@ -366,6 +369,16 @@ public class ServerConnectionDelegate extends ServerDelegate public void connectionStartOk(Connection conn, ConnectionStartOk ok) { _clientProperties = ok.getClientProperties(); + if(_clientProperties != null) + { + Object compressionSupported = + _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported)); + + } + } super.connectionStartOk(conn, ok); } @@ -400,4 +413,9 @@ public class ServerConnectionDelegate extends ServerDelegate int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY); return delay == 0 ? super.getHeartbeatMax() : delay; } + + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } } |