summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
commit76cb0102b45908480751581895af1aea251182ac (patch)
tree69b1f052c7781327bcbb832a77b8668c9041ac91
parent579be5dc252220e02e6988458644ea2e60d39f0f (diff)
downloadqpid-python-76cb0102b45908480751581895af1aea251182ac.tar.gz
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1618375 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java12
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java45
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java20
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java18
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java47
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java8
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java22
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java21
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java6
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java121
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java13
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java4
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java24
-rwxr-xr-xjava/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java118
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java168
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java48
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java119
-rw-r--r--java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java102
-rw-r--r--java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java236
40 files changed, 1205 insertions, 219 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index f8585344b0..b7be1bfd9b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -50,7 +50,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- void send(MessageInstance entry, boolean batch);
+ long send(MessageInstance entry, boolean batch);
void flushBatched();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 982ebb01c6..1a9390f210 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -128,6 +128,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedAttribute( defaultValue = "false")
boolean getStatisticsReportingResetEnabled();
+ String BROKER_MESSAGE_COMPRESSION_ENABLED = "broker.messageCompressionEnabled";
+ @ManagedContextDefault(name = BROKER_MESSAGE_COMPRESSION_ENABLED)
+ boolean DEFAULT_MESSAGE_COMPRESSION_ENABLED = true;
+
+ @ManagedAttribute( defaultValue = "${"+ BROKER_MESSAGE_COMPRESSION_ENABLED +"}")
+ boolean isMessageCompressionEnabled();
+
+ String MESSAGE_COMPRESSION_THRESHOLD_SIZE = "connection.messageCompressionThresholdSize";
+ @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
+ int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+
+
@DerivedAttribute( persist = true )
String getModelVersion();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 7a965c19d7..5b3965904e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -43,6 +43,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
String TRANSPORT = "transport";
String PORT = "port";
+
@DerivedAttribute
String getClientId();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 67c713e9d9..af46bae1c4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -92,6 +92,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
+ @ManagedAttributeField
+ private boolean _messageCompressionEnabled;
private State _state = State.UNINITIALIZED;
@@ -360,6 +362,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@Override
+ public boolean isMessageCompressionEnabled()
+ {
+ return _messageCompressionEnabled;
+ }
+
+ @Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index d80aa92007..4044c938db 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -471,9 +471,8 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- ServerMessage message = entry.getMessage();
- _deliveredBytes.addAndGet(message.getSize());
- _target.send(entry, batch);
+ long size = _target.send(entry, batch);
+ _deliveredBytes.addAndGet(size);
}
@Override
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 8d025c50dc..ad33ecadcf 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -167,13 +167,15 @@ public class MockConsumer implements ConsumerTarget
{
}
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
+ long size = entry.getMessage().getSize();
if (messages.contains(entry))
{
entry.setRedelivered();
}
messages.add(entry);
+ return size;
}
public void flushBatched()
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index ce1c95e674..f13886d2b2 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -208,10 +208,11 @@ public class StandardQueueTest extends AbstractQueueTestBase
* @param entry
* @param batch
*/
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
- super.send(entry, batch);
+ long size = super.send(entry, batch);
latch.countDown();
+ return size;
}
};
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 7ab3fbb1f5..ec0c38ec42 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +52,7 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.util.GZIPUtils;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
@@ -198,7 +200,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public void send(final MessageInstance entry, boolean batch)
+ public long send(final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -264,11 +266,44 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
deliveryProps.setRedelivered(entry.isRedelivered());
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+ boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
+
+ ByteBuffer body = msg.getBody();
- xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+ boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
+
+ if(msgCompressed && !compressionSupported)
+ {
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body);
+ if(uncompressed != null)
+ {
+ messageProps.setContentEncoding(null);
+ body = ByteBuffer.wrap(uncompressed);
+ }
+ }
+ else if(!msgCompressed
+ && compressionSupported
+ && (messageProps == null || messageProps.getContentEncoding()==null)
+ && body.remaining() > _session.getConnection().getMessageCompressionThreshold())
+ {
+ byte[] compressed = GZIPUtils.compressBufferToArray(body);
+ if(compressed != null)
+ {
+ if(messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ }
+ messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ body = ByteBuffer.wrap(compressed);
+ }
+ }
+ long size = body == null ? 0 : body.remaining();
+
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
@@ -311,7 +346,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
-
+ return size;
}
void recordUnacknowledged(MessageInstance entry)
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8ddd04f51a..60bb5c6112 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -74,7 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
private final Object _reference = new Object();
- private VirtualHostImpl _virtualHost;
+ private VirtualHostImpl<?,?,?> _virtualHost;
private Port<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
@@ -87,6 +87,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
new CopyOnWriteArrayList<SessionModelListener>();
private volatile boolean _stopped;
+ private int _messageCompressionThreshold;
public ServerConnection(final long connectionId, Broker broker)
{
@@ -172,14 +173,22 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
super.setConnectionDelegate(delegate);
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl virtualHost)
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
+ _messageCompressionThreshold =
+ virtualHost.getContextValue(Integer.class,
+ Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+
+ if(_messageCompressionThreshold <= 0)
+ {
+ _messageCompressionThreshold = Integer.MAX_VALUE;
+ }
}
@Override
@@ -639,4 +648,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
_taskList.remove(task);
}
+
+ public int getMessageCompressionThreshold()
+ {
+ return _messageCompressionThreshold;
+ }
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index bab2d802e8..cc9d66756b 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -64,6 +64,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private final SubjectCreator _subjectCreator;
private int _maximumFrameSize;
+ private boolean _compressionSupported;
+
public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
@@ -111,6 +113,7 @@ public class ServerConnectionDelegate extends ServerDelegate
map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion());
map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion());
map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName());
+ map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled()));
return map;
}
@@ -366,6 +369,16 @@ public class ServerConnectionDelegate extends ServerDelegate
public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
_clientProperties = ok.getClientProperties();
+ if(_clientProperties != null)
+ {
+ Object compressionSupported =
+ _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
+ if (compressionSupported != null)
+ {
+ _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
+
+ }
+ }
super.connectionStartOk(conn, ok);
}
@@ -400,4 +413,9 @@ public class ServerConnectionDelegate extends ServerDelegate
int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
return delay == 0 ? super.getHeartbeatMax() : delay;
}
+
+ public boolean isCompressionSupported()
+ {
+ return _compressionSupported && _broker.isMessageCompressionEnabled();
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 1c264e52c6..c193491e1e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -103,7 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private String _clientProduct = null;
private String _remoteProcessPid = null;
- private VirtualHostImpl _virtualHost;
+ private VirtualHostImpl<?,?,?> _virtualHost;
private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
@@ -175,6 +175,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private volatile boolean _stopped;
private long _readBytes;
private boolean _authenticated;
+ private boolean _compressionSupported;
+ private int _messageCompressionThreshold;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -208,7 +210,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return null;
}
});
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
@@ -539,6 +541,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_broker.getName());
serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE,
String.valueOf(_closeWhenNoRoute));
+ serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
+ String.valueOf(_broker.isMessageCompressionEnabled()));
AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
(short) pv.getActualMinorVersion(),
@@ -1131,6 +1135,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this);
}
}
+ String compressionSupported = clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
+ if (compressionSupported != null)
+ {
+ _compressionSupported = Boolean.parseBoolean(compressionSupported);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Client set compressionSupported=" + _compressionSupported + " for protocol engine " + this);
+ }
+ }
_clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
_clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
@@ -1181,17 +1194,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getMethodRegistry();
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
{
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
+
+ _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
+ Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ if(_messageCompressionThreshold <= 0)
+ {
+ _messageCompressionThreshold = Integer.MAX_VALUE;
+ }
}
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
@@ -1595,15 +1615,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
- registerMessageDelivered(message.getSize());
- _protocolOutputConverter.writeDeliver(message,
+ long size = _protocolOutputConverter.writeDeliver(message,
props,
_channelId,
deliveryTag,
new AMQShortString(sub.getName()));
+ registerMessageDelivered(size);
+ return size;
}
}
@@ -1636,6 +1657,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _closeWhenNoRoute;
}
+ @Override
+ public boolean isCompressionSupported()
+ {
+ return _compressionSupported && _broker.isMessageCompressionEnabled();
+ }
+
+ @Override
+ public int getMessageCompressionThreshold()
+ {
+ return _messageCompressionThreshold;
+ }
+
public EventLogger getEventLogger()
{
if(_virtualHost != null)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index bab0aaf3da..8d5142338a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -174,9 +174,9 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
Object getReference();
- VirtualHostImpl getVirtualHost();
+ VirtualHostImpl<?,?,?> getVirtualHost();
- void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException;
+ void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException;
public ProtocolOutputConverter getProtocolOutputConverter();
@@ -210,4 +210,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
* can't be routed rather than returning the message.
*/
boolean isCloseWhenNoRoute();
+
+ boolean isCompressionSupported();
+
+ int getMessageCompressionThreshold();
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index fa26a73f93..c7871e8b9a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -26,6 +26,6 @@ import org.apache.qpid.server.message.ServerMessage;
public interface ClientDeliveryMethod
{
- void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
+ long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 7c2efe64e6..d5eed242e7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -116,7 +116,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -194,17 +194,17 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
MessageReference ref = message.newReference();
InstanceProperties props = entry.getInstanceProperties();
entry.delete();
-
+ long size;
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(message, props, deliveryTag);
+ size = sendToClient(message, props, deliveryTag);
}
ref.release();
-
+ return size;
}
@@ -291,7 +291,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
@@ -303,9 +303,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
entry.addStateChangeListener(getReleasedStateChangeListener());
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
-
+ return size;
}
}
@@ -502,9 +502,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
{
- _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+ return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 0026bad063..c3bdedf44d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.EnumSet;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -30,26 +33,23 @@ import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-import java.util.EnumSet;
-
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -202,17 +202,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
@Override
- public void deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
- _session.getProtocolOutputConverter().writeGetOk(message,
+ long size =_session.getProtocolOutputConverter().writeGetOk(message,
props,
_channel.getChannelId(),
deliveryTag,
_queue.getQueueDepthMessages());
_deliveredMessage = true;
+ return size;
}
public boolean hasDeliveredMessage()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
index 7678ce812b..4ee5cbc17d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java
@@ -26,7 +26,6 @@
*/
package org.apache.qpid.server.protocol.v0_8.output;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -35,7 +34,6 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
public interface ProtocolOutputConverter
{
@@ -46,12 +44,12 @@ public interface ProtocolOutputConverter
ProtocolOutputConverter newInstance(AMQProtocolSession session);
}
- void writeDeliver(final ServerMessage msg,
+ long writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag);
- void writeGetOk(final ServerMessage msg,
+ long writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index f786cb113a..9e41f7884c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.output;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -27,6 +31,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetOkBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -34,16 +39,13 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpid.util.GZIPUtils;
class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
@@ -51,6 +53,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
private final MethodRegistry _methodRegistry;
private final AMQProtocolSession _protocolSession;
+ private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry)
{
@@ -64,7 +67,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(final ServerMessage m,
+ public long writeDeliver(final ServerMessage m,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag)
@@ -72,7 +75,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQMessage msg = convertToAMQMessage(m);
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
- writeMessageDelivery(msg, channelId, deliverBody);
+ return writeMessageDelivery(msg, channelId, deliverBody);
}
private AMQMessage convertToAMQMessage(ServerMessage serverMessage)
@@ -93,21 +96,97 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return MessageConverterRegistry.getConverter(clazz, AMQMessage.class);
}
- private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
+ private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
{
- writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
+ return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
{
-
int bodySize = (int) message.getSize();
+ boolean msgCompressed = isCompressed(contentHeaderBody);
+ byte[] modifiedContent;
+
+ // straight through case
+ boolean compressionSupported = _protocolSession.isCompressionSupported();
+
+ if(msgCompressed && !compressionSupported &&
+ (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null)
+ {
+ BasicContentHeaderProperties modifiedProps =
+ new BasicContentHeaderProperties(contentHeaderBody.getProperties());
+ modifiedProps.setEncoding((String)null);
+
+ writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
+
+ return modifiedContent.length;
+ }
+ else if(!msgCompressed
+ && compressionSupported
+ && contentHeaderBody.getProperties().getEncoding()==null
+ && bodySize > _protocolSession.getMessageCompressionThreshold()
+ && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null)
+ {
+ BasicContentHeaderProperties modifiedProps =
+ new BasicContentHeaderProperties(contentHeaderBody.getProperties());
+ modifiedProps.setEncoding(GZIP_ENCODING);
+
+ writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
+
+ return modifiedContent.length;
+ }
+ else
+ {
+ writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize);
+
+ return bodySize;
+ }
+ }
- if(bodySize == 0)
+ private int writeMessageDeliveryModified(final int channelId,
+ final AMQBody deliverBody,
+ final BasicContentHeaderProperties modifiedProps,
+ final byte[] content)
+ {
+ final int bodySize;
+ bodySize = content.length;
+ ContentHeaderBody modifiedHeaderBody =
+ new ContentHeaderBody(BASIC_CLASS_ID, 0, modifiedProps, bodySize);
+ final MessageContentSource wrappedSource = new MessageContentSource()
+ {
+ @Override
+ public int getContent(final ByteBuffer buf, final int offset)
+ {
+ int size = Math.min(buf.remaining(), content.length - offset);
+ buf.put(content, offset, size);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(final int offset, final int size)
+ {
+ return ByteBuffer.wrap(content, offset, size);
+ }
+
+ @Override
+ public long getSize()
+ {
+ return content.length;
+ }
+ };
+ writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize);
+ return bodySize;
+ }
+
+ private void writeMessageDeliveryUnchanged(final MessageContentSource message,
+ final ContentHeaderBody contentHeaderBody,
+ final int channelId, final AMQBody deliverBody, final int bodySize)
+ {
+ if (bodySize == 0)
{
SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
+ contentHeaderBody);
writeFrame(compositeBlock);
}
@@ -120,13 +199,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
int writtenSize = capacity;
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+ AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity);
CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ compositeBlock =
+ new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
- while(writtenSize < bodySize)
+ while (writtenSize < bodySize)
{
capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
@@ -137,6 +217,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
+ private boolean isCompressed(final ContentHeaderBody contentHeaderBody)
+ {
+ return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding());
+ }
+
private class MessageContentSourceBody implements AMQBody
{
public static final byte TYPE = 3;
@@ -186,14 +271,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
}
- public void writeGetOk(final ServerMessage msg,
+ public long writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
int queueSize)
{
AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize);
- writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
+ return writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7f4a3701cd..05ae5285ad 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -141,13 +141,13 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
}
- public void writeDeliver(final ServerMessage msg,
+ public long writeDeliver(final ServerMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag)
{
_deliveryCount.incrementAndGet();
-
+ long size = msg.getSize();
synchronized (_channelDelivers)
{
Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
@@ -168,14 +168,16 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
}
+ return size;
}
- public void writeGetOk(final ServerMessage msg,
+ public long writeGetOk(final ServerMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
int queueSize)
{
+ return msg.getSize();
}
public void awaitDelivery(int msgs)
@@ -244,11 +246,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
@Override
- public void deliverToClient(ConsumerImpl sub, ServerMessage message,
+ public long deliverToClient(ConsumerImpl sub, ServerMessage message,
InstanceProperties props, long deliveryTag)
{
_deliveryCount.incrementAndGet();
-
+ long size = message.getSize();
synchronized (_channelDelivers)
{
Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
@@ -269,6 +271,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
consumerDelivers.add(new DeliveryPair(deliveryTag, message));
}
+ return size;
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index bceae85896..918a890af5 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -112,10 +112,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
// TODO
+ long size = entry.getMessage().getSize();
send(entry);
+ return size;
}
public void flushBatched()
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 5b9bdc7244..3572b98cad 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -32,6 +32,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.apache.qpid.util.GZIPUtils;
public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0>
{
@@ -202,7 +204,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
SectionEncoder sectionEncoder)
{
final String mimeType = serverMessage.getMessageHeader().getMimeType();
- Section bodySection = getBodySection(serverMessage, mimeType);
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+ byte[] uncompressed;
+
+ if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding())
+ && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null)
+ {
+ data = uncompressed;
+ metaData.getPropertiesSection().setContentEncoding(null);
+ }
+
+
+ Section bodySection = convertMessageBody(mimeType, data);
final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
@@ -279,14 +293,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
};
}
- protected Section getBodySection(final M serverMessage, final String mimeType)
- {
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
-
- return convertMessageBody(mimeType, data);
- }
-
private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
{
int headerSize = (int) metaData.getStorableSize();
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 4540308f61..fbc24ba454 100755
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -72,6 +72,17 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
this(sections, encodeSections(sections, encoder));
}
+ public Properties getPropertiesSection()
+ {
+ return _properties;
+ }
+
+
+ public Header getHeaderSection()
+ {
+ return _header;
+ }
+
private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
{
ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 7d9dfcd600..b64d355f80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,6 +20,39 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
@@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
+ private boolean _compressMessages;
+ private int _messageCompressionThresholdSize;
+
/**
* @param broker brokerdetails
* @param username username
@@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
}
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null)
+ {
+ _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES));
+ }
+ else
+ {
+ _compressMessages =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES,
+ String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES)));
+ }
+
+
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null)
+ {
+ _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE));
+ }
+ else
+ {
+ _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE,
+ ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ }
+ if(_messageCompressionThresholdSize <= 0)
+ {
+ _messageCompressionThresholdSize = Integer.MAX_VALUE;
+ }
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
@@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if ((message == null) || message.equals(""))
+ if (message == null)
{
- if (message == null)
- {
- message = "Unable to Connect";
- }
- else // can only be "" if getMessage() returned it therfore lastException != null
- {
- message = "Unable to Connect:" + connectionException.getClass();
- }
+ message = "Unable to Connect";
+ }
+ else if("".equals(message))
+ {
+ message = "Unable to Connect:" + connectionException.getClass();
}
for (Throwable th = connectionException; th != null; th = th.getCause())
@@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPublish;
}
+ public boolean isMessageCompressionDesired()
+ {
+ return _compressMessages;
+ }
+
public int getNextChannelID()
{
return _sessions.getNextChannelId();
@@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return super.setClosed();
}
+
+ public int getMessageCompressionThresholdSize()
+ {
+ return _messageCompressionThresholdSize;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 0329deee03..74ca1ed74f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -27,10 +32,6 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-import java.io.IOException;
-
public interface AMQConnectionDelegate
{
ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
@@ -82,4 +83,6 @@ public interface AMQConnectionDelegate
void setHeartbeatListener(HeartbeatListener listener);
boolean supportsIsBound();
+
+ boolean isMessageCompressionSupported();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 95b1178407..4e9164c3b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -21,6 +21,17 @@
package org.apache.qpid.client;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +40,6 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.common.ServerPropertyNames;
-
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
@@ -48,16 +58,6 @@ import org.apache.qpid.transport.SessionDetachCode;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
/**
@@ -441,7 +441,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
try
{
clientProps.put(ConnectionStartProperties.CLIENT_ID_0_10, _conn.getClientID());
- conSettings.setClientProperties(clientProps);
+ if(_conn.isMessageCompressionDesired())
+ {
+ clientProps.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, Boolean.TRUE.toString());
+ }
+ conSettings.setClientProperties(clientProps);
}
catch (JMSException e)
{
@@ -504,4 +508,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
//0-10 supports the isBound method
return true;
}
+
+ @Override
+ public boolean isMessageCompressionSupported()
+ {
+ return _qpidConnection.isMessageCompressionSupported();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index dfbf7ec60a..5242629a91 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -20,8 +20,21 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -49,22 +62,11 @@ import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Set;
-
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
+ private boolean _messageCompressionSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -139,6 +141,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
+ _messageCompressionSupported = checkMessageCompressionSupported();
return null;
}
else
@@ -413,4 +416,17 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return connectedToQpid;
}
+
+ private boolean checkMessageCompressionSupported()
+ {
+ FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ return serverProperties != null
+ && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED));
+
+ }
+
+ public boolean isMessageCompressionSupported()
+ {
+ return _messageCompressionSupported;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 9efc670e99..8ce3d662d4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -17,6 +17,19 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,25 +49,18 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
/**
* This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
+
+ // TODO - move and add properties to change this
+ private static final int MESSAGE_COMPRESSION_THRESHOLD_SIZE = 4096;
+
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class);
private byte[] userIDBytes;
@@ -204,6 +210,19 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
ByteBuffer data = message.getData();
+
+ if(data.remaining() > getConnection().getMessageCompressionThresholdSize() && getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired() && messageProps.getContentEncoding() == null)
+ {
+ byte[] compressed = GZIPUtils.compressBufferToArray(data);
+ if(compressed != null)
+ {
+ messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ data = ByteBuffer.wrap(compressed);
+ }
+ }
+
+
messageProps.setContentLength(data == null ? 0 : data.remaining());
// send the message
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index b9bb03444f..fedb8e088c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.client;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
+import java.util.zip.GZIPOutputStream;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -147,7 +150,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- final int size = (payload != null) ? payload.limit() : 0;
+ int size = (payload != null) ? payload.remaining() : 0;
+
+ if(size > getConnection().getMessageCompressionThresholdSize() && getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired() && contentHeaderProperties.getEncoding() == null)
+ {
+ contentHeaderProperties.setEncoding("gzip");
+ try(ByteArrayOutputStream compressedOutputBuffer = new ByteArrayOutputStream(size / 2))
+ {
+ try (GZIPOutputStream output = new GZIPOutputStream(compressedOutputBuffer))
+ {
+ if(payload.hasArray())
+ {
+ output.write(payload.array(),payload.position()+payload.arrayOffset(),payload.remaining());
+ }
+ else
+ {
+ byte[] tmp = new byte[size];
+ payload.get(tmp);
+ output.write(tmp);
+ }
+ }
+
+ byte[] compressedData = compressedOutputBuffer.toByteArray();
+ payload = ByteBuffer.wrap(compressedData);
+ size = compressedData.length;
+ }
+ catch (IOException e)
+ {
+ // TODO - shouldn't happen
+ }
+ }
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
index baae072167..e8343fda0a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
@@ -18,11 +18,12 @@
*/
package org.apache.qpid.client.handler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Used during connection establishment to optionally set the "close when no route" client property
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index b0c30f82fa..2e817f2966 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.client.handler;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,12 +47,6 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import java.io.UnsupportedEncodingException;
-import java.util.StringTokenizer;
-
public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -173,6 +174,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
ConnectionURL url = getConnectionURL(session);
_closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties);
+ clientProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
+ String.valueOf(session.getAMQConnection().isMessageCompressionDesired()));
+
ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -188,7 +192,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
else
{
_log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
- + "] which is not supported by this version of the client library");
+ + "] which is not supported by this version of the client library");
session.closeProtocolSession();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index e52ff9acb2..71d07b1fa0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.client.message;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,16 +39,11 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
-
-import javax.jms.JMSException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.qpid.util.GZIPUtils;
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
@@ -52,46 +58,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- // we optimise the non-fragmented case to avoid copying
- if ((bodies != null) && (bodies.size() == 1))
- {
- if (debug)
- {
- _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
- }
+ byte[] uncompressed;
- data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
+ if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(contentHeader.getProperties().getEncodingAsString())
+ && (uncompressed = GZIPUtils.uncompressStreamToArray(new BodyInputStream(bodies))) != null )
+ {
+ contentHeader.getProperties().setEncoding((String)null);
+ data = ByteBuffer.wrap(uncompressed);
}
- else if (bodies != null)
+ else
{
- if (debug)
+ // we optimise the non-fragmented case to avoid copying
+ if ((bodies != null) && (bodies.size() == 1))
{
- _logger.debug("Fragmented message body (" + bodies
- .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")");
- }
+ if (debug)
+ {
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
+ }
- data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem?
- final Iterator it = bodies.iterator();
- while (it.hasNext())
+ data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
+ }
+ else if (bodies != null)
{
- ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
- if(payload.isDirect() || payload.isReadOnly())
+ if (debug)
{
- data.put(payload);
+ _logger.debug("Fragmented message body (" + bodies
+ .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")");
}
- else
+
+ data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem?
+ final Iterator it = bodies.iterator();
+ while (it.hasNext())
{
- data.put(payload.array(), payload.arrayOffset(), payload.limit());
+ ContentBody cb = (ContentBody) it.next();
+ final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
+ if (payload.isDirect() || payload.isReadOnly())
+ {
+ data.put(payload);
+ }
+ else
+ {
+ data.put(payload.array(), payload.arrayOffset(), payload.limit());
+ }
+
}
+ data.flip();
+ }
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
}
-
- data.flip();
- }
- else // bodies == null
- {
- data = ByteBuffer.allocate(0);
}
if (debug)
@@ -132,22 +149,42 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
.remaining());
}
+ if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(msgProps.getContentEncoding()))
+ {
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(data);
+ if(uncompressed != null)
+ {
+ msgProps.setContentEncoding(null);
+ data = ByteBuffer.wrap(uncompressed);
+ }
+ }
AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr);
AbstractJMSMessage message = createMessage(delegate, data);
return message;
}
- private static final String asString(byte[] bytes)
+ private ByteBuffer uncompressBody(final InputStream bodyInputStream) throws AMQException
{
- if (bytes == null)
+ final ByteBuffer data;
+ try(GZIPInputStream gzipInputStream = new GZIPInputStream(bodyInputStream))
{
- return null;
+ ByteArrayOutputStream uncompressedBuffer = new ByteArrayOutputStream();
+ int read;
+ byte[] buf = new byte[4096];
+ while((read = gzipInputStream.read(buf))!=-1)
+ {
+ uncompressedBuffer.write(buf,0,read);
+ }
+ byte[] uncompressedBytes = uncompressedBuffer.toByteArray();
+ data = ByteBuffer.wrap(uncompressedBytes);
}
- else
+ catch (IOException e)
{
- return new String(bytes);
+ // TODO - shouldn't happen
+ throw new AMQException("Error uncompressing gzipped message data", e);
}
+ return data;
}
@@ -174,4 +211,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
return msg;
}
+ private class BodyInputStream extends InputStream
+ {
+ private final Iterator<ContentBody> _bodiesIter;
+ private byte[] _currentBuffer;
+ private int _currentPos;
+ public BodyInputStream(final List<ContentBody> bodies)
+ {
+ _bodiesIter = bodies.iterator();
+ _currentBuffer = _bodiesIter.next().getPayload();
+ _currentPos = 0;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ byte[] buf = new byte[1];
+ int size = read(buf);
+ if(size == -1)
+ {
+ throw new EOFException();
+ }
+ else
+ {
+ return ((int)buf[0])&0xff;
+ }
+ }
+
+ @Override
+ public int read(final byte[] dst, final int off, final int len)
+ {
+ while(_currentPos == _currentBuffer.length)
+ {
+ if(!_bodiesIter.hasNext())
+ {
+ return -1;
+ }
+ else
+ {
+ _currentBuffer = _bodiesIter.next().getPayload();
+ _currentPos = 0;
+ }
+ }
+ int size = Math.min(len, _currentBuffer.length - _currentPos);
+ System.arraycopy(_currentBuffer,_currentPos, dst,off,size);
+ _currentPos+=size;
+ return size;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 2901a5f983..754b90c372 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -70,6 +70,11 @@ public interface ConnectionURL
*/
public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute";
+
+ public static final String OPTIONS_COMPRESS_MESSAGES = "compressMessages";
+ public static final String OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE = "messageCompressionThresholdSize";
+
+
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index f10961c092..24ec496cc9 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -254,6 +254,19 @@ public class ClientProperties
public static final String CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = "qpid.connection_ssl_verify_hostname";
public static final boolean DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = true;
+ /**
+ * System property to set a default value for a connection option 'compress_messages'
+ */
+ public static final String CONNECTION_OPTION_COMPRESS_MESSAGES = "qpid.connection_compress_messages";
+ public static final boolean DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES = false;
+
+
+ /**
+ * System property to set a default value for a connection option 'message_compression_threshold_size'
+ */
+ public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size";
+ public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+
private ClientProperties()
{
//No instances
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index fe8c94cee1..b490aee898 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -83,8 +83,48 @@ public class BasicContentHeaderProperties
private byte[] _encodedForm;
+ public BasicContentHeaderProperties(BasicContentHeaderProperties other)
+ {
+ if(other._headers != null)
+ {
+ byte[] encodedHeaders = other._headers.getDataAsBytes();
+
+ _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length);
+
+ }
+
+ _contentType = other._contentType;
+
+ _encoding = other._encoding;
+
+ _deliveryMode = other._deliveryMode;
+
+ _priority = other._priority;
+
+ _correlationId = other._correlationId;
+
+ _replyTo = other._replyTo;
+
+ _expiration = other._expiration;
+
+ _messageId = other._messageId;
+
+ _timestamp = other._timestamp;
+
+ _type = other._type;
+
+ _userId = other._userId;
+
+ _appId = other._appId;
+
+ _clusterId = other._clusterId;
+
+ _propertyFlags = other._propertyFlags;
+ }
+
public BasicContentHeaderProperties()
- { }
+ {
+ }
public int getPropertyListSize()
{
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index c4220894a8..9a455ce868 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -84,7 +84,7 @@ public class FieldTable
_encodedSize = length;
}
- public FieldTable(byte[] encodedForm, int offset, int length) throws IOException
+ public FieldTable(byte[] encodedForm, int offset, int length)
{
this();
_encodedForm = encodedForm;
@@ -858,7 +858,17 @@ public class FieldTable
}
}
- return _encodedForm.clone();
+ else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length)
+ {
+ return _encodedForm.clone();
+ }
+ else
+ {
+ byte[] encodedCopy = new byte[(int) _encodedSize];
+ System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize);
+ return encodedCopy;
+ }
+
}
public long getEncodedSize()
diff --git a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
index b2bcc1836e..8f1a1d0be0 100644
--- a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
@@ -41,6 +41,9 @@ public class ConnectionStartProperties
*/
public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route";
+ public static final String QPID_MESSAGE_COMPRESSION_SUPPORTED = "qpid.message_compression_supported";
+
+
public static final String CLIENT_ID_0_10 = "clientName";
public static final String CLIENT_ID_0_8 = "instance";
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 44cb30e735..99fc02c959 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -41,6 +41,7 @@ import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
@@ -78,6 +79,7 @@ public class Connection extends ConnectionInvoker
private long _lastReadTime;
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
+ private boolean _messageCompressionSupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -699,6 +701,7 @@ public class Connection extends ConnectionInvoker
public void setServerProperties(final Map<String, Object> serverProperties)
{
_serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
+ _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
}
public Map<String, Object> getServerProperties()
@@ -848,4 +851,9 @@ public class Connection extends ConnectionInvoker
};
}
}
+
+ public boolean isMessageCompressionSupported()
+ {
+ return _messageCompressionSupported;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
index b72b342187..14b804f8c0 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
@@ -92,4 +92,9 @@ public class ByteBufferInputStream extends InputStream
{
return _buffer.remaining();
}
+
+ @Override
+ public void close()
+ {
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java b/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java
new file mode 100644
index 0000000000..b5ba0b29af
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GZIPUtils
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(GZIPUtils.class);
+
+ public static final String GZIP_CONTENT_ENCODING = "gzip";
+
+
+ /**
+ * Return a new byte array with the compressed contents of the input buffer
+ *
+ * @param input byte buffer to compress
+ * @return a byte array containing the compressed data, or null if the input was null or there was an unexpected
+ * IOException while compressing
+ */
+ public static byte[] compressBufferToArray(ByteBuffer input)
+ {
+ if(input != null)
+ {
+ try (ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream())
+ {
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedBuffer))
+ {
+ if (input.hasArray())
+ {
+ gzipOutputStream.write(input.array(),
+ input.arrayOffset() + input.position(),
+ input.remaining());
+ }
+ else
+ {
+
+ byte[] data = new byte[input.remaining()];
+
+ input.duplicate().get(data);
+
+ gzipOutputStream.write(data);
+ }
+ }
+ return compressedBuffer.toByteArray();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unexpected IOException when attempting to compress with gzip", e);
+ }
+ }
+ return null;
+ }
+
+ public static byte[] uncompressBufferToArray(ByteBuffer contentBuffer)
+ {
+ if(contentBuffer != null)
+ {
+ try (ByteBufferInputStream input = new ByteBufferInputStream(contentBuffer))
+ {
+ return uncompressStreamToArray(input);
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public static byte[] uncompressStreamToArray(InputStream stream)
+ {
+ if(stream != null)
+ {
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(stream))
+ {
+ ByteArrayOutputStream inflatedContent = new ByteArrayOutputStream();
+ int read;
+ byte[] buf = new byte[4096];
+ while ((read = gzipInputStream.read(buf)) != -1)
+ {
+ inflatedContent.write(buf, 0, read);
+ }
+ return inflatedContent.toByteArray();
+ }
+ catch (IOException e)
+ {
+
+ LOGGER.warn("Unexpected IOException when attempting to uncompress with gzip", e);
+ }
+ }
+ return null;
+ }
+}
diff --git a/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java b/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java
new file mode 100644
index 0000000000..60e80da15f
--- /dev/null
+++ b/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class GZIPUtilsTest extends TestCase
+{
+ public void testCompressUncompress() throws Exception
+ {
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ byte[] compressed = GZIPUtils.compressBufferToArray(ByteBuffer.wrap(data));
+ assertTrue("Compression didn't compress", compressed.length < data.length);
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(compressed));
+ assertTrue("Compression not reversible", Arrays.equals(data,uncompressed));
+ }
+
+ public void testUncompressNonZipReturnsNull() throws Exception
+ {
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ assertNull("Non zipped data should not uncompress", GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)));
+ }
+
+ public void testUncompressStreamWithErrorReturnsNull() throws Exception
+ {
+ InputStream is = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ throw new IOException();
+ }
+ };
+ assertNull("Stream error should return null", GZIPUtils.uncompressStreamToArray(is));
+ }
+
+
+ public void testUncompressNullStreamReturnsNull() throws Exception
+ {
+ assertNull("Null Stream should return null", GZIPUtils.uncompressStreamToArray(null));
+ }
+ public void testUncompressNullBufferReturnsNull() throws Exception
+ {
+ assertNull("Null buffer should return null", GZIPUtils.uncompressBufferToArray(null));
+ }
+
+ public void testCompressNullArrayReturnsNull()
+ {
+ assertNull(GZIPUtils.compressBufferToArray(null));
+ }
+
+ public void testNonHeapBuffers() throws Exception
+ {
+
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);
+ directBuffer.put(data);
+ directBuffer.flip();
+
+ byte[] compressed = GZIPUtils.compressBufferToArray(directBuffer);
+
+ assertTrue("Compression didn't compress", compressed.length < data.length);
+
+ directBuffer.clear();
+ directBuffer.position(1);
+ directBuffer = directBuffer.slice();
+ directBuffer.put(compressed);
+ directBuffer.flip();
+
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(directBuffer);
+
+ assertTrue("Compression not reversible", Arrays.equals(data,uncompressed));
+
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java b/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
new file mode 100644
index 0000000000..e1fca306ce
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
@@ -0,0 +1,236 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class MessageCompressionTest extends QpidBrokerTestCase
+{
+ private RestTestHelper _restTestHelper = new RestTestHelper(findFreePort());
+
+ @Override
+ public void setUp() throws Exception
+ {
+ // do nothing - only call setup after props set
+ }
+
+ public void doActualSetUp() throws Exception
+ {
+ // use webadmin account to perform tests
+ _restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
+
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.addHttpManagementConfiguration();
+ config.setObjectAttribute(Port.class,
+ TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
+ Port.PORT,
+ _restTestHelper.getHttpPort());
+
+ config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "{}");
+
+ // set password authentication provider on http port for the tests
+ config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+ config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ _restTestHelper.tearDown();
+ }
+ }
+
+ public void testSenderCompressesReceiverUncompresses() throws Exception
+ {
+ doTestCompression(true, true, true);
+ }
+
+ public void testSenderCompressesOnly() throws Exception
+ {
+ doTestCompression(true, false, true);
+
+ }
+
+ public void testReceiverUncompressesOnly() throws Exception
+ {
+ doTestCompression(false, true, true);
+
+ }
+
+ public void testNoCompression() throws Exception
+ {
+ doTestCompression(false, false, true);
+
+ }
+
+
+ public void testDisablingCompressionAtBroker() throws Exception
+ {
+ doTestCompression(true, true, false);
+ }
+
+
+ private void doTestCompression(final boolean senderCompresses,
+ final boolean receiverUncompresses,
+ final boolean brokerCompressionEnabled) throws Exception
+ {
+
+ setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(brokerCompressionEnabled));
+
+ doActualSetUp();
+
+ String messageText = createMessageText();
+ Connection senderConnection = getConnection(senderCompresses);
+ String virtualPath = getConnectionFactory().getVirtualPath();
+ String testQueueName = getTestQueueName();
+
+ // create the queue using REST and bind it
+ assertEquals(201,
+ _restTestHelper.submitRequest("/api/latest/queue"
+ + virtualPath
+ + virtualPath
+ + "/"
+ + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
+ assertEquals(201,
+ _restTestHelper.submitRequest("/api/latest/binding"
+ + virtualPath
+ + virtualPath
+ + "/amq.direct/"
+ + testQueueName
+ + "/"
+ + testQueueName, "PUT", Collections.<String, Object>emptyMap()));
+
+ Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // send a large message
+ MessageProducer producer = session.createProducer(getTestQueue());
+ TextMessage sentMessage = session.createTextMessage(messageText);
+ sentMessage.setStringProperty("bar", "foo");
+
+ producer.send(sentMessage);
+ ((AMQSession)session).sync();
+
+ // get the number of bytes received at the broker on the connection
+ List<Map<String, Object>> connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection");
+ assertEquals(1, connectionRestOutput.size());
+ Map statistics = (Map) connectionRestOutput.get(0).get("statistics");
+ int bytesIn = (Integer) statistics.get("bytesIn");
+
+ // if sending compressed then the bytesIn statistic for the connection should reflect the compressed size of the
+ // message
+ if(senderCompresses && brokerCompressionEnabled)
+ {
+ assertTrue("Message was not sent compressed", bytesIn < messageText.length());
+ }
+ else
+ {
+ assertFalse("Message was incorrectly sent compressed", bytesIn < messageText.length());
+ }
+ senderConnection.close();
+
+ // receive the message
+ Connection consumerConnection = getConnection(receiverUncompresses);
+ session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(getTestQueue());
+ consumerConnection.start();
+
+ TextMessage message = (TextMessage) consumer.receive(500l);
+ assertNotNull("Message was not received", message);
+ assertEquals("Message was corrupted", messageText, message.getText());
+ assertEquals("Header was corrupted", "foo", message.getStringProperty("bar"));
+
+ // get the number of bytes sent by the broker
+ connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection");
+ assertEquals(1, connectionRestOutput.size());
+ statistics = (Map) connectionRestOutput.get(0).get("statistics");
+ int bytesOut = (Integer) statistics.get("bytesOut");
+
+ // if receiving compressed the bytes out statistic from the connection should reflect the compressed size of the
+ // message
+ if(receiverUncompresses && brokerCompressionEnabled)
+ {
+ assertTrue("Message was not received compressed", bytesOut < messageText.length());
+ }
+ else
+ {
+ assertFalse("Message was incorrectly received compressed", bytesOut < messageText.length());
+ }
+
+ consumerConnection.close();
+ }
+
+ private String createMessageText()
+ {
+ StringBuilder stringBuilder = new StringBuilder();
+ while(stringBuilder.length() < 2048*1024)
+ {
+ stringBuilder.append("This should compress easily. ");
+ }
+ return stringBuilder.toString();
+ }
+
+ private Connection getConnection(final boolean compress) throws URLSyntaxException, NamingException, JMSException
+ {
+ AMQConnectionURL url = new AMQConnectionURL(getConnectionFactory().getConnectionURLString());
+
+ url.setOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES,String.valueOf(compress));
+ url = new AMQConnectionURL(url.toString());
+ url.setUsername(GUEST_USERNAME);
+ url.setPassword(GUEST_PASSWORD);
+ url.setOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES,String.valueOf(compress));
+ return getConnection(url);
+ }
+
+}