summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java45
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java18
3 files changed, 75 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 7ab3fbb1f5..ec0c38ec42 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +52,7 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.util.GZIPUtils;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
@@ -198,7 +200,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public void send(final MessageInstance entry, boolean batch)
+ public long send(final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -264,11 +266,44 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
deliveryProps.setRedelivered(entry.isRedelivered());
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+ boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
+
+ ByteBuffer body = msg.getBody();
- xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+ boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
+
+ if(msgCompressed && !compressionSupported)
+ {
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body);
+ if(uncompressed != null)
+ {
+ messageProps.setContentEncoding(null);
+ body = ByteBuffer.wrap(uncompressed);
+ }
+ }
+ else if(!msgCompressed
+ && compressionSupported
+ && (messageProps == null || messageProps.getContentEncoding()==null)
+ && body.remaining() > _session.getConnection().getMessageCompressionThreshold())
+ {
+ byte[] compressed = GZIPUtils.compressBufferToArray(body);
+ if(compressed != null)
+ {
+ if(messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ }
+ messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ body = ByteBuffer.wrap(compressed);
+ }
+ }
+ long size = body == null ? 0 : body.remaining();
+
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
@@ -311,7 +346,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
-
+ return size;
}
void recordUnacknowledged(MessageInstance entry)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8ddd04f51a..60bb5c6112 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -74,7 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
private final Object _reference = new Object();
- private VirtualHostImpl _virtualHost;
+ private VirtualHostImpl<?,?,?> _virtualHost;
private Port<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
@@ -87,6 +87,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
new CopyOnWriteArrayList<SessionModelListener>();
private volatile boolean _stopped;
+ private int _messageCompressionThreshold;
public ServerConnection(final long connectionId, Broker broker)
{
@@ -172,14 +173,22 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
super.setConnectionDelegate(delegate);
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl virtualHost)
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
+ _messageCompressionThreshold =
+ virtualHost.getContextValue(Integer.class,
+ Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+
+ if(_messageCompressionThreshold <= 0)
+ {
+ _messageCompressionThreshold = Integer.MAX_VALUE;
+ }
}
@Override
@@ -639,4 +648,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_taskList.remove(task);
}
+
+ public int getMessageCompressionThreshold()
+ {
+ return _messageCompressionThreshold;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index bab2d802e8..cc9d66756b 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -64,6 +64,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private final SubjectCreator _subjectCreator;
private int _maximumFrameSize;
+ private boolean _compressionSupported;
+
public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
@@ -111,6 +113,7 @@ public class ServerConnectionDelegate extends ServerDelegate
map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion());
map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion());
map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
+ map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled()));
return map;
}
@@ -366,6 +369,16 @@ public class ServerConnectionDelegate extends ServerDelegate
public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
_clientProperties = ok.getClientProperties();
+ if(_clientProperties != null)
+ {
+ Object compressionSupported =
+ _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
+ if (compressionSupported != null)
+ {
+ _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
+
+ }
+ }
super.connectionStartOk(conn, ok);
}
@@ -400,4 +413,9 @@ public class ServerConnectionDelegate extends ServerDelegate
int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
return delay == 0 ? super.getHeartbeatMax() : delay;
}
+
+ public boolean isCompressionSupported()
+ {
+ return _compressionSupported && _broker.isMessageCompressionEnabled();
+ }
}