diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
commit | 76cb0102b45908480751581895af1aea251182ac (patch) | |
tree | 69b1f052c7781327bcbb832a77b8668c9041ac91 | |
parent | 579be5dc252220e02e6988458644ea2e60d39f0f (diff) | |
download | qpid-python-76cb0102b45908480751581895af1aea251182ac.tar.gz |
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1618375 13f79535-47bb-0310-9956-ffa450edef68
40 files changed, 1205 insertions, 219 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index f8585344b0..b7be1bfd9b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -50,7 +50,7 @@ public interface ConsumerTarget AMQSessionModel getSessionModel(); - void send(MessageInstance entry, boolean batch); + long send(MessageInstance entry, boolean batch); void flushBatched(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 982ebb01c6..1a9390f210 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -128,6 +128,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedAttribute( defaultValue = "false") boolean getStatisticsReportingResetEnabled(); + String BROKER_MESSAGE_COMPRESSION_ENABLED = "broker.messageCompressionEnabled"; + @ManagedContextDefault(name = BROKER_MESSAGE_COMPRESSION_ENABLED) + boolean DEFAULT_MESSAGE_COMPRESSION_ENABLED = true; + + @ManagedAttribute( defaultValue = "${"+ BROKER_MESSAGE_COMPRESSION_ENABLED +"}") + boolean isMessageCompressionEnabled(); + + String MESSAGE_COMPRESSION_THRESHOLD_SIZE = "connection.messageCompressionThresholdSize"; + @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE) + int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + + @DerivedAttribute( persist = true ) String getModelVersion(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index 7a965c19d7..5b3965904e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -43,6 +43,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> String TRANSPORT = "transport"; String PORT = "port"; + @DerivedAttribute String getClientId(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 67c713e9d9..af46bae1c4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -92,6 +92,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple private int _statisticsReportingPeriod; @ManagedAttributeField private boolean _statisticsReportingResetEnabled; + @ManagedAttributeField + private boolean _messageCompressionEnabled; private State _state = State.UNINITIALIZED; @@ -360,6 +362,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @Override + public boolean isMessageCompressionEnabled() + { + return _messageCompressionEnabled; + } + + @Override public String getModelVersion() { return BrokerModel.MODEL_VERSION; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index d80aa92007..4044c938db 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -471,9 +471,8 @@ class QueueConsumerImpl public final void send(final QueueEntry entry, final boolean batch) { _deliveredCount.incrementAndGet(); - ServerMessage message = entry.getMessage(); - _deliveredBytes.addAndGet(message.getSize()); - _target.send(entry, batch); + long size = _target.send(entry, batch); + _deliveredBytes.addAndGet(size); } @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 8d025c50dc..ad33ecadcf 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -167,13 +167,15 @@ public class MockConsumer implements ConsumerTarget { } - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { + long size = entry.getMessage().getSize(); if (messages.contains(entry)) { entry.setRedelivered(); } messages.add(entry); + return size; } public void flushBatched() diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index ce1c95e674..f13886d2b2 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -208,10 +208,11 @@ public class StandardQueueTest extends AbstractQueueTestBase * @param entry * @param batch */ - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { - super.send(entry, batch); + long size = super.send(entry, batch); latch.countDown(); + return size; } }; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 7ab3fbb1f5..ec0c38ec42 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +52,7 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; +import org.apache.qpid.util.GZIPUtils; public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { @@ -198,7 +200,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public void send(final MessageInstance entry, boolean batch) + public long send(final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -264,11 +266,44 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC deliveryProps.setRedelivered(entry.isRedelivered()); - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding()); + + ByteBuffer body = msg.getBody(); - xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported(); + + if(msgCompressed && !compressionSupported) + { + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body); + if(uncompressed != null) + { + messageProps.setContentEncoding(null); + body = ByteBuffer.wrap(uncompressed); + } + } + else if(!msgCompressed + && compressionSupported + && (messageProps == null || messageProps.getContentEncoding()==null) + && body.remaining() > _session.getConnection().getMessageCompressionThreshold()) + { + byte[] compressed = GZIPUtils.compressBufferToArray(body); + if(compressed != null) + { + if(messageProps == null) + { + messageProps = new MessageProperties(); + } + messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); + body = ByteBuffer.wrap(compressed); + } + } + long size = body == null ? 0 : body.remaining(); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { @@ -311,7 +346,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - + return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8ddd04f51a..60bb5c6112 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -74,7 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; private final Object _reference = new Object(); - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; @@ -87,6 +87,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S new CopyOnWriteArrayList<SessionModelListener>(); private volatile boolean _stopped; + private int _messageCompressionThreshold; public ServerConnection(final long connectionId, Broker broker) { @@ -172,14 +173,22 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; + _messageCompressionThreshold = + virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } @Override @@ -639,4 +648,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _taskList.remove(task); } + + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index bab2d802e8..cc9d66756b 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -64,6 +64,8 @@ public class ServerConnectionDelegate extends ServerDelegate private final SubjectCreator _subjectCreator; private int _maximumFrameSize; + private boolean _compressionSupported; + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); @@ -111,6 +113,7 @@ public class ServerConnectionDelegate extends ServerDelegate map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion()); map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion()); map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName()); + map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled())); return map; } @@ -366,6 +369,16 @@ public class ServerConnectionDelegate extends ServerDelegate public void connectionStartOk(Connection conn, ConnectionStartOk ok) { _clientProperties = ok.getClientProperties(); + if(_clientProperties != null) + { + Object compressionSupported = + _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported)); + + } + } super.connectionStartOk(conn, ok); } @@ -400,4 +413,9 @@ public class ServerConnectionDelegate extends ServerDelegate int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY); return delay == 0 ? super.getHeartbeatMax() : delay; } + + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 1c264e52c6..c193491e1e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -103,7 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private String _clientProduct = null; private String _remoteProcessPid = null; - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); @@ -175,6 +175,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _stopped; private long _readBytes; private boolean _authenticated; + private boolean _compressionSupported; + private int _messageCompressionThreshold; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -208,7 +210,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return null; } }); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); @@ -539,6 +541,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _broker.getName()); serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, String.valueOf(_closeWhenNoRoute)); + serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, + String.valueOf(_broker.isMessageCompressionEnabled())); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -1131,6 +1135,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this); } } + String compressionSupported = clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(compressionSupported); + if(_logger.isDebugEnabled()) + { + _logger.debug("Client set compressionSupported=" + _compressionSupported + " for protocol engine " + this); + } + } _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); @@ -1181,17 +1194,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getMethodRegistry(); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException { _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); + + _messageCompressionThreshold = virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } public void addDeleteTask(Action<? super AMQProtocolEngine> task) @@ -1595,15 +1615,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - registerMessageDelivered(message.getSize()); - _protocolOutputConverter.writeDeliver(message, + long size = _protocolOutputConverter.writeDeliver(message, props, _channelId, deliveryTag, new AMQShortString(sub.getName())); + registerMessageDelivered(size); + return size; } } @@ -1636,6 +1657,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closeWhenNoRoute; } + @Override + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } + + @Override + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } + public EventLogger getEventLogger() { if(_virtualHost != null) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index bab0aaf3da..8d5142338a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -174,9 +174,9 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> Object getReference(); - VirtualHostImpl getVirtualHost(); + VirtualHostImpl<?,?,?> getVirtualHost(); - void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException; + void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException; public ProtocolOutputConverter getProtocolOutputConverter(); @@ -210,4 +210,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> * can't be routed rather than returning the message. */ boolean isCloseWhenNoRoute(); + + boolean isCompressionSupported(); + + int getMessageCompressionThreshold(); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index fa26a73f93..c7871e8b9a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -26,6 +26,6 @@ import org.apache.qpid.server.message.ServerMessage; public interface ClientDeliveryMethod { - void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, + long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 7c2efe64e6..d5eed242e7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -116,7 +116,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -194,17 +194,17 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen MessageReference ref = message.newReference(); InstanceProperties props = entry.getInstanceProperties(); entry.delete(); - + long size; synchronized (getChannel()) { getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(message, props, deliveryTag); + size = sendToClient(message, props, deliveryTag); } ref.release(); - + return size; } @@ -291,7 +291,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { @@ -303,9 +303,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen addUnacknowledgedMessage(entry); recordMessageDelivery(entry, deliveryTag); entry.addStateChangeListener(getReleasedStateChangeListener()); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - + return size; } } @@ -502,9 +502,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 0026bad063..c3bdedf44d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.EnumSet; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -30,26 +33,23 @@ import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; -import java.util.EnumSet; - public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -202,17 +202,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); - _session.getProtocolOutputConverter().writeGetOk(message, + long size =_session.getProtocolOutputConverter().writeGetOk(message, props, _channel.getChannelId(), deliveryTag, _queue.getQueueDepthMessages()); _deliveredMessage = true; + return size; } public boolean hasDeliveredMessage() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index 7678ce812b..4ee5cbc17d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -26,7 +26,6 @@ */ package org.apache.qpid.server.protocol.v0_8.output; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -35,7 +34,6 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; public interface ProtocolOutputConverter { @@ -46,12 +44,12 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session); } - void writeDeliver(final ServerMessage msg, + long writeDeliver(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag); - void writeGetOk(final ServerMessage msg, + long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index f786cb113a..9e41f7884c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.output; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -27,6 +31,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicGetOkBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -34,16 +39,13 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpid.util.GZIPUtils; class ProtocolOutputConverterImpl implements ProtocolOutputConverter { @@ -51,6 +53,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter private final MethodRegistry _methodRegistry; private final AMQProtocolSession _protocolSession; + private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry) { @@ -64,7 +67,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession; } - public void writeDeliver(final ServerMessage m, + public long writeDeliver(final ServerMessage m, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag) @@ -72,7 +75,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQMessage msg = convertToAMQMessage(m); final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag); - writeMessageDelivery(msg, channelId, deliverBody); + return writeMessageDelivery(msg, channelId, deliverBody); } private AMQMessage convertToAMQMessage(ServerMessage serverMessage) @@ -93,21 +96,97 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return MessageConverterRegistry.getConverter(clazz, AMQMessage.class); } - private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) { - writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); + return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); } - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) { - int bodySize = (int) message.getSize(); + boolean msgCompressed = isCompressed(contentHeaderBody); + byte[] modifiedContent; + + // straight through case + boolean compressionSupported = _protocolSession.isCompressionSupported(); + + if(msgCompressed && !compressionSupported && + (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding((String)null); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else if(!msgCompressed + && compressionSupported + && contentHeaderBody.getProperties().getEncoding()==null + && bodySize > _protocolSession.getMessageCompressionThreshold() + && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding(GZIP_ENCODING); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else + { + writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize); + + return bodySize; + } + } - if(bodySize == 0) + private int writeMessageDeliveryModified(final int channelId, + final AMQBody deliverBody, + final BasicContentHeaderProperties modifiedProps, + final byte[] content) + { + final int bodySize; + bodySize = content.length; + ContentHeaderBody modifiedHeaderBody = + new ContentHeaderBody(BASIC_CLASS_ID, 0, modifiedProps, bodySize); + final MessageContentSource wrappedSource = new MessageContentSource() + { + @Override + public int getContent(final ByteBuffer buf, final int offset) + { + int size = Math.min(buf.remaining(), content.length - offset); + buf.put(content, offset, size); + return size; + } + + @Override + public ByteBuffer getContent(final int offset, final int size) + { + return ByteBuffer.wrap(content, offset, size); + } + + @Override + public long getSize() + { + return content.length; + } + }; + writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize); + return bodySize; + } + + private void writeMessageDeliveryUnchanged(final MessageContentSource message, + final ContentHeaderBody contentHeaderBody, + final int channelId, final AMQBody deliverBody, final int bodySize) + { + if (bodySize == 0) { SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); + contentHeaderBody); writeFrame(compositeBlock); } @@ -120,13 +199,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter int writtenSize = capacity; - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity); CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + compositeBlock = + new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); writeFrame(compositeBlock); - while(writtenSize < bodySize) + while (writtenSize < bodySize) { capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); @@ -137,6 +217,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + private boolean isCompressed(final ContentHeaderBody contentHeaderBody) + { + return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding()); + } + private class MessageContentSourceBody implements AMQBody { public static final byte TYPE = 3; @@ -186,14 +271,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - public void writeGetOk(final ServerMessage msg, + public long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, int queueSize) { AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize); - writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); + return writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 7f4a3701cd..05ae5285ad 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -141,13 +141,13 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } - public void writeDeliver(final ServerMessage msg, + public long writeDeliver(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag) { _deliveryCount.incrementAndGet(); - + long size = msg.getSize(); synchronized (_channelDelivers) { Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); @@ -168,14 +168,16 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumerDelivers.add(new DeliveryPair(deliveryTag, msg)); } + return size; } - public void writeGetOk(final ServerMessage msg, + public long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, int queueSize) { + return msg.getSize(); } public void awaitDelivery(int msgs) @@ -244,11 +246,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(ConsumerImpl sub, ServerMessage message, + public long deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); - + long size = message.getSize(); synchronized (_channelDelivers) { Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); @@ -269,6 +271,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumerDelivers.add(new DeliveryPair(deliveryTag, message)); } + return size; } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index bceae85896..918a890af5 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -112,10 +112,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // TODO + long size = entry.getMessage().getSize(); send(entry); + return size; } public void flushBatched() diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 5b9bdc7244..3572b98cad 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -32,6 +32,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; @@ -43,6 +44,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.apache.qpid.util.GZIPUtils; public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> { @@ -202,7 +204,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement SectionEncoder sectionEncoder) { final String mimeType = serverMessage.getMessageHeader().getMimeType(); - Section bodySection = getBodySection(serverMessage, mimeType); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + byte[] uncompressed; + + if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding()) + && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null) + { + data = uncompressed; + metaData.getPropertiesSection().setContentEncoding(null); + } + + + Section bodySection = convertMessageBody(mimeType, data); final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); @@ -279,14 +293,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement }; } - protected Section getBodySection(final M serverMessage, final String mimeType) - { - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - return convertMessageBody(mimeType, data); - } - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) { int headerSize = (int) metaData.getStorableSize(); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 4540308f61..fbc24ba454 100755 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -72,6 +72,17 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData this(sections, encodeSections(sections, encoder)); } + public Properties getPropertiesSection() + { + return _properties; + } + + + public Header getHeaderSection() + { + return _header; + } + private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) { ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 7d9dfcd600..b64d355f80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,6 +20,39 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Address resolution purposes private volatile long _lastFailoverTime = 0; + private boolean _compressMessages; + private int _messageCompressionThresholdSize; + /** * @param broker brokerdetails * @param username username @@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null) + { + _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES)); + } + else + { + _compressMessages = + Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES, + String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES))); + } + + + if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null) + { + _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE)); + } + else + { + _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE, + ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE); + } + if(_messageCompressionThresholdSize <= 0) + { + _messageCompressionThresholdSize = Integer.MAX_VALUE; + } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) @@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if ((message == null) || message.equals("")) + if (message == null) { - if (message == null) - { - message = "Unable to Connect"; - } - else // can only be "" if getMessage() returned it therfore lastException != null - { - message = "Unable to Connect:" + connectionException.getClass(); - } + message = "Unable to Connect"; + } + else if("".equals(message)) + { + message = "Unable to Connect:" + connectionException.getClass(); } for (Throwable th = connectionException; th != null; th = th.getCause()) @@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPublish; } + public boolean isMessageCompressionDesired() + { + return _compressMessages; + } + public int getNextChannelID() { return _sessions.getNextChannelId(); @@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return super.setClosed(); } + + public int getMessageCompressionThresholdSize() + { + return _messageCompressionThresholdSize; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 0329deee03..74ca1ed74f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.client; +import java.io.IOException; + +import javax.jms.JMSException; +import javax.jms.XASession; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -27,10 +32,6 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import javax.jms.JMSException; -import javax.jms.XASession; -import java.io.IOException; - public interface AMQConnectionDelegate { ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; @@ -82,4 +83,6 @@ public interface AMQConnectionDelegate void setHeartbeatListener(HeartbeatListener listener); boolean supportsIsBound(); + + boolean isMessageCompressionSupported(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 95b1178407..4e9164c3b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -21,6 +21,17 @@ package org.apache.qpid.client; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.XASession; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +40,6 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.common.ServerPropertyNames; - import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; @@ -48,16 +58,6 @@ import org.apache.qpid.transport.SessionDetachCode; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.XASession; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** @@ -441,7 +441,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec try { clientProps.put(ConnectionStartProperties.CLIENT_ID_0_10, _conn.getClientID()); - conSettings.setClientProperties(clientProps); + if(_conn.isMessageCompressionDesired()) + { + clientProps.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, Boolean.TRUE.toString()); + } + conSettings.setClientProperties(clientProps); } catch (JMSException e) { @@ -504,4 +508,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec //0-10 supports the isBound method return true; } + + @Override + public boolean isMessageCompressionSupported() + { + return _qpidConnection.isMessageCompressionSupported(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index dfbf7ec60a..5242629a91 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -20,8 +20,21 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.JMSException; +import javax.jms.XASession; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -49,22 +62,11 @@ import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import javax.jms.JMSException; -import javax.jms.XASession; - -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Set; - public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private boolean _messageCompressionSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -139,6 +141,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn.getFailoverPolicy().attainedConnection(); _conn.setConnected(true); _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); + _messageCompressionSupported = checkMessageCompressionSupported(); return null; } else @@ -413,4 +416,17 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return connectedToQpid; } + + private boolean checkMessageCompressionSupported() + { + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + return serverProperties != null + && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)); + + } + + public boolean isMessageCompressionSupported() + { + return _messageCompressionSupported; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 9efc670e99..8ce3d662d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,6 +17,19 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,25 +49,18 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - /** * This is a 0_10 message producer. */ public class BasicMessageProducer_0_10 extends BasicMessageProducer { + + // TODO - move and add properties to change this + private static final int MESSAGE_COMPRESSION_THRESHOLD_SIZE = 4096; + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class); private byte[] userIDBytes; @@ -204,6 +210,19 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } ByteBuffer data = message.getData(); + + if(data.remaining() > getConnection().getMessageCompressionThresholdSize() && getConnection().getDelegate().isMessageCompressionSupported() + && getConnection().isMessageCompressionDesired() && messageProps.getContentEncoding() == null) + { + byte[] compressed = GZIPUtils.compressBufferToArray(data); + if(compressed != null) + { + messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); + data = ByteBuffer.wrap(compressed); + } + } + + messageProps.setContentLength(data == null ? 0 : data.remaining()); // send the message diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index b9bb03444f..fedb8e088c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.client; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.UUID; +import java.util.zip.GZIPOutputStream; import javax.jms.JMSException; import javax.jms.Message; @@ -147,7 +150,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - final int size = (payload != null) ? payload.limit() : 0; + int size = (payload != null) ? payload.remaining() : 0; + + if(size > getConnection().getMessageCompressionThresholdSize() && getConnection().getDelegate().isMessageCompressionSupported() + && getConnection().isMessageCompressionDesired() && contentHeaderProperties.getEncoding() == null) + { + contentHeaderProperties.setEncoding("gzip"); + try(ByteArrayOutputStream compressedOutputBuffer = new ByteArrayOutputStream(size / 2)) + { + try (GZIPOutputStream output = new GZIPOutputStream(compressedOutputBuffer)) + { + if(payload.hasArray()) + { + output.write(payload.array(),payload.position()+payload.arrayOffset(),payload.remaining()); + } + else + { + byte[] tmp = new byte[size]; + payload.get(tmp); + output.write(tmp); + } + } + + byte[] compressedData = compressedOutputBuffer.toByteArray(); + payload = ByteBuffer.wrap(compressedData); + size = compressedData.length; + } + catch (IOException e) + { + // TODO - shouldn't happen + } + } final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java index baae072167..e8343fda0a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java @@ -18,11 +18,12 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.properties.ConnectionStartProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Used during connection establishment to optionally set the "close when no route" client property diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index b0c30f82fa..2e817f2966 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.client.handler; +import java.io.UnsupportedEncodingException; +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +47,6 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.properties.ConnectionStartProperties; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import java.io.UnsupportedEncodingException; -import java.util.StringTokenizer; - public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody> { private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class); @@ -173,6 +174,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co ConnectionURL url = getConnectionURL(session); _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties); + clientProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, + String.valueOf(session.getAMQConnection().isMessageCompressionDesired())); + ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -188,7 +192,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co else { _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor() - + "] which is not supported by this version of the client library"); + + "] which is not supported by this version of the client library"); session.closeProtocolSession(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index e52ff9acb2..71d07b1fa0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.client.message; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import javax.jms.JMSException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,16 +39,11 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; - -import javax.jms.JMSException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; +import org.apache.qpid.util.GZIPUtils; public abstract class AbstractJMSMessageFactory implements MessageFactory { @@ -52,46 +58,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); - // we optimise the non-fragmented case to avoid copying - if ((bodies != null) && (bodies.size() == 1)) - { - if (debug) - { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); - } + byte[] uncompressed; - data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); + if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(contentHeader.getProperties().getEncodingAsString()) + && (uncompressed = GZIPUtils.uncompressStreamToArray(new BodyInputStream(bodies))) != null ) + { + contentHeader.getProperties().setEncoding((String)null); + data = ByteBuffer.wrap(uncompressed); } - else if (bodies != null) + else { - if (debug) + // we optimise the non-fragmented case to avoid copying + if ((bodies != null) && (bodies.size() == 1)) { - _logger.debug("Fragmented message body (" + bodies - .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); - } + if (debug) + { + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); + } - data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? - final Iterator it = bodies.iterator(); - while (it.hasNext()) + data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); + } + else if (bodies != null) { - ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); - if(payload.isDirect() || payload.isReadOnly()) + if (debug) { - data.put(payload); + _logger.debug("Fragmented message body (" + bodies + .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); } - else + + data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? + final Iterator it = bodies.iterator(); + while (it.hasNext()) { - data.put(payload.array(), payload.arrayOffset(), payload.limit()); + ContentBody cb = (ContentBody) it.next(); + final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); + if (payload.isDirect() || payload.isReadOnly()) + { + data.put(payload); + } + else + { + data.put(payload.array(), payload.arrayOffset(), payload.limit()); + } + } + data.flip(); + } + else // bodies == null + { + data = ByteBuffer.allocate(0); } - - data.flip(); - } - else // bodies == null - { - data = ByteBuffer.allocate(0); } if (debug) @@ -132,22 +149,42 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data .remaining()); } + if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(msgProps.getContentEncoding())) + { + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(data); + if(uncompressed != null) + { + msgProps.setContentEncoding(null); + data = ByteBuffer.wrap(uncompressed); + } + } AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr); AbstractJMSMessage message = createMessage(delegate, data); return message; } - private static final String asString(byte[] bytes) + private ByteBuffer uncompressBody(final InputStream bodyInputStream) throws AMQException { - if (bytes == null) + final ByteBuffer data; + try(GZIPInputStream gzipInputStream = new GZIPInputStream(bodyInputStream)) { - return null; + ByteArrayOutputStream uncompressedBuffer = new ByteArrayOutputStream(); + int read; + byte[] buf = new byte[4096]; + while((read = gzipInputStream.read(buf))!=-1) + { + uncompressedBuffer.write(buf,0,read); + } + byte[] uncompressedBytes = uncompressedBuffer.toByteArray(); + data = ByteBuffer.wrap(uncompressedBytes); } - else + catch (IOException e) { - return new String(bytes); + // TODO - shouldn't happen + throw new AMQException("Error uncompressing gzipped message data", e); } + return data; } @@ -174,4 +211,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory return msg; } + private class BodyInputStream extends InputStream + { + private final Iterator<ContentBody> _bodiesIter; + private byte[] _currentBuffer; + private int _currentPos; + public BodyInputStream(final List<ContentBody> bodies) + { + _bodiesIter = bodies.iterator(); + _currentBuffer = _bodiesIter.next().getPayload(); + _currentPos = 0; + } + + @Override + public int read() throws IOException + { + byte[] buf = new byte[1]; + int size = read(buf); + if(size == -1) + { + throw new EOFException(); + } + else + { + return ((int)buf[0])&0xff; + } + } + + @Override + public int read(final byte[] dst, final int off, final int len) + { + while(_currentPos == _currentBuffer.length) + { + if(!_bodiesIter.hasNext()) + { + return -1; + } + else + { + _currentBuffer = _bodiesIter.next().getPayload(); + _currentPos = 0; + } + } + int size = Math.min(len, _currentBuffer.length - _currentPos); + System.arraycopy(_currentBuffer,_currentPos, dst,off,size); + _currentPos+=size; + return size; + } + + @Override + public void close() + { + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 2901a5f983..754b90c372 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -70,6 +70,11 @@ public interface ConnectionURL */ public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute"; + + public static final String OPTIONS_COMPRESS_MESSAGES = "compressMessages"; + public static final String OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE = "messageCompressionThresholdSize"; + + public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index f10961c092..24ec496cc9 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -254,6 +254,19 @@ public class ClientProperties public static final String CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = "qpid.connection_ssl_verify_hostname"; public static final boolean DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = true; + /** + * System property to set a default value for a connection option 'compress_messages' + */ + public static final String CONNECTION_OPTION_COMPRESS_MESSAGES = "qpid.connection_compress_messages"; + public static final boolean DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES = false; + + + /** + * System property to set a default value for a connection option 'message_compression_threshold_size' + */ + public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size"; + public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + private ClientProperties() { //No instances diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index fe8c94cee1..b490aee898 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.framing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class BasicContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -83,8 +83,48 @@ public class BasicContentHeaderProperties private byte[] _encodedForm; + public BasicContentHeaderProperties(BasicContentHeaderProperties other) + { + if(other._headers != null) + { + byte[] encodedHeaders = other._headers.getDataAsBytes(); + + _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length); + + } + + _contentType = other._contentType; + + _encoding = other._encoding; + + _deliveryMode = other._deliveryMode; + + _priority = other._priority; + + _correlationId = other._correlationId; + + _replyTo = other._replyTo; + + _expiration = other._expiration; + + _messageId = other._messageId; + + _timestamp = other._timestamp; + + _type = other._type; + + _userId = other._userId; + + _appId = other._appId; + + _clusterId = other._clusterId; + + _propertyFlags = other._propertyFlags; + } + public BasicContentHeaderProperties() - { } + { + } public int getPropertyListSize() { diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index c4220894a8..9a455ce868 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -84,7 +84,7 @@ public class FieldTable _encodedSize = length; } - public FieldTable(byte[] encodedForm, int offset, int length) throws IOException + public FieldTable(byte[] encodedForm, int offset, int length) { this(); _encodedForm = encodedForm; @@ -858,7 +858,17 @@ public class FieldTable } } - return _encodedForm.clone(); + else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length) + { + return _encodedForm.clone(); + } + else + { + byte[] encodedCopy = new byte[(int) _encodedSize]; + System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize); + return encodedCopy; + } + } public long getEncodedSize() diff --git a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index b2bcc1836e..8f1a1d0be0 100644 --- a/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -41,6 +41,9 @@ public class ConnectionStartProperties */ public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route"; + public static final String QPID_MESSAGE_COMPRESSION_SUPPORTED = "qpid.message_compression_supported"; + + public static final String CLIENT_ID_0_10 = "clientName"; public static final String CLIENT_ID_0_8 = "instance"; diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 44cb30e735..99fc02c959 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -41,6 +41,7 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; @@ -78,6 +79,7 @@ public class Connection extends ConnectionInvoker private long _lastReadTime; private NetworkConnection _networkConnection; private FrameSizeObserver _frameSizeObserver; + private boolean _messageCompressionSupported; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -699,6 +701,7 @@ public class Connection extends ConnectionInvoker public void setServerProperties(final Map<String, Object> serverProperties) { _serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties; + _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED))); } public Map<String, Object> getServerProperties() @@ -848,4 +851,9 @@ public class Connection extends ConnectionInvoker }; } } + + public boolean isMessageCompressionSupported() + { + return _messageCompressionSupported; + } } diff --git a/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java index b72b342187..14b804f8c0 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java +++ b/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java @@ -92,4 +92,9 @@ public class ByteBufferInputStream extends InputStream { return _buffer.remaining(); } + + @Override + public void close() + { + } } diff --git a/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java b/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java new file mode 100644 index 0000000000..b5ba0b29af --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GZIPUtils +{ + private static final Logger LOGGER = LoggerFactory.getLogger(GZIPUtils.class); + + public static final String GZIP_CONTENT_ENCODING = "gzip"; + + + /** + * Return a new byte array with the compressed contents of the input buffer + * + * @param input byte buffer to compress + * @return a byte array containing the compressed data, or null if the input was null or there was an unexpected + * IOException while compressing + */ + public static byte[] compressBufferToArray(ByteBuffer input) + { + if(input != null) + { + try (ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream()) + { + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedBuffer)) + { + if (input.hasArray()) + { + gzipOutputStream.write(input.array(), + input.arrayOffset() + input.position(), + input.remaining()); + } + else + { + + byte[] data = new byte[input.remaining()]; + + input.duplicate().get(data); + + gzipOutputStream.write(data); + } + } + return compressedBuffer.toByteArray(); + } + catch (IOException e) + { + LOGGER.warn("Unexpected IOException when attempting to compress with gzip", e); + } + } + return null; + } + + public static byte[] uncompressBufferToArray(ByteBuffer contentBuffer) + { + if(contentBuffer != null) + { + try (ByteBufferInputStream input = new ByteBufferInputStream(contentBuffer)) + { + return uncompressStreamToArray(input); + } + } + else + { + return null; + } + } + + public static byte[] uncompressStreamToArray(InputStream stream) + { + if(stream != null) + { + try (GZIPInputStream gzipInputStream = new GZIPInputStream(stream)) + { + ByteArrayOutputStream inflatedContent = new ByteArrayOutputStream(); + int read; + byte[] buf = new byte[4096]; + while ((read = gzipInputStream.read(buf)) != -1) + { + inflatedContent.write(buf, 0, read); + } + return inflatedContent.toByteArray(); + } + catch (IOException e) + { + + LOGGER.warn("Unexpected IOException when attempting to uncompress with gzip", e); + } + } + return null; + } +} diff --git a/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java b/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java new file mode 100644 index 0000000000..60e80da15f --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import junit.framework.TestCase; + +public class GZIPUtilsTest extends TestCase +{ + public void testCompressUncompress() throws Exception + { + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + byte[] compressed = GZIPUtils.compressBufferToArray(ByteBuffer.wrap(data)); + assertTrue("Compression didn't compress", compressed.length < data.length); + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(compressed)); + assertTrue("Compression not reversible", Arrays.equals(data,uncompressed)); + } + + public void testUncompressNonZipReturnsNull() throws Exception + { + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + assertNull("Non zipped data should not uncompress", GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data))); + } + + public void testUncompressStreamWithErrorReturnsNull() throws Exception + { + InputStream is = new InputStream() + { + @Override + public int read() throws IOException + { + throw new IOException(); + } + }; + assertNull("Stream error should return null", GZIPUtils.uncompressStreamToArray(is)); + } + + + public void testUncompressNullStreamReturnsNull() throws Exception + { + assertNull("Null Stream should return null", GZIPUtils.uncompressStreamToArray(null)); + } + public void testUncompressNullBufferReturnsNull() throws Exception + { + assertNull("Null buffer should return null", GZIPUtils.uncompressBufferToArray(null)); + } + + public void testCompressNullArrayReturnsNull() + { + assertNull(GZIPUtils.compressBufferToArray(null)); + } + + public void testNonHeapBuffers() throws Exception + { + + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); + directBuffer.put(data); + directBuffer.flip(); + + byte[] compressed = GZIPUtils.compressBufferToArray(directBuffer); + + assertTrue("Compression didn't compress", compressed.length < data.length); + + directBuffer.clear(); + directBuffer.position(1); + directBuffer = directBuffer.slice(); + directBuffer.put(compressed); + directBuffer.flip(); + + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(directBuffer); + + assertTrue("Compression not reversible", Arrays.equals(data,uncompressed)); + + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java b/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java new file mode 100644 index 0000000000..e1fca306ce --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java @@ -0,0 +1,236 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.NamingException; + +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.systest.rest.RestTestHelper; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.url.URLSyntaxException; + +public class MessageCompressionTest extends QpidBrokerTestCase +{ + private RestTestHelper _restTestHelper = new RestTestHelper(findFreePort()); + + @Override + public void setUp() throws Exception + { + // do nothing - only call setup after props set + } + + public void doActualSetUp() throws Exception + { + // use webadmin account to perform tests + _restTestHelper.setUsernameAndPassword("webadmin", "webadmin"); + + TestBrokerConfiguration config = getBrokerConfiguration(); + config.addHttpManagementConfiguration(); + config.setObjectAttribute(Port.class, + TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, + Port.PORT, + _restTestHelper.getHttpPort()); + + config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER, + "secureOnlyMechanisms", + "{}"); + + // set password authentication provider on http port for the tests + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER, + TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); + config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + _restTestHelper.tearDown(); + } + } + + public void testSenderCompressesReceiverUncompresses() throws Exception + { + doTestCompression(true, true, true); + } + + public void testSenderCompressesOnly() throws Exception + { + doTestCompression(true, false, true); + + } + + public void testReceiverUncompressesOnly() throws Exception + { + doTestCompression(false, true, true); + + } + + public void testNoCompression() throws Exception + { + doTestCompression(false, false, true); + + } + + + public void testDisablingCompressionAtBroker() throws Exception + { + doTestCompression(true, true, false); + } + + + private void doTestCompression(final boolean senderCompresses, + final boolean receiverUncompresses, + final boolean brokerCompressionEnabled) throws Exception + { + + setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(brokerCompressionEnabled)); + + doActualSetUp(); + + String messageText = createMessageText(); + Connection senderConnection = getConnection(senderCompresses); + String virtualPath = getConnectionFactory().getVirtualPath(); + String testQueueName = getTestQueueName(); + + // create the queue using REST and bind it + assertEquals(201, + _restTestHelper.submitRequest("/api/latest/queue" + + virtualPath + + virtualPath + + "/" + + testQueueName, "PUT", Collections.<String, Object>emptyMap())); + assertEquals(201, + _restTestHelper.submitRequest("/api/latest/binding" + + virtualPath + + virtualPath + + "/amq.direct/" + + testQueueName + + "/" + + testQueueName, "PUT", Collections.<String, Object>emptyMap())); + + Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // send a large message + MessageProducer producer = session.createProducer(getTestQueue()); + TextMessage sentMessage = session.createTextMessage(messageText); + sentMessage.setStringProperty("bar", "foo"); + + producer.send(sentMessage); + ((AMQSession)session).sync(); + + // get the number of bytes received at the broker on the connection + List<Map<String, Object>> connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection"); + assertEquals(1, connectionRestOutput.size()); + Map statistics = (Map) connectionRestOutput.get(0).get("statistics"); + int bytesIn = (Integer) statistics.get("bytesIn"); + + // if sending compressed then the bytesIn statistic for the connection should reflect the compressed size of the + // message + if(senderCompresses && brokerCompressionEnabled) + { + assertTrue("Message was not sent compressed", bytesIn < messageText.length()); + } + else + { + assertFalse("Message was incorrectly sent compressed", bytesIn < messageText.length()); + } + senderConnection.close(); + + // receive the message + Connection consumerConnection = getConnection(receiverUncompresses); + session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(getTestQueue()); + consumerConnection.start(); + + TextMessage message = (TextMessage) consumer.receive(500l); + assertNotNull("Message was not received", message); + assertEquals("Message was corrupted", messageText, message.getText()); + assertEquals("Header was corrupted", "foo", message.getStringProperty("bar")); + + // get the number of bytes sent by the broker + connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection"); + assertEquals(1, connectionRestOutput.size()); + statistics = (Map) connectionRestOutput.get(0).get("statistics"); + int bytesOut = (Integer) statistics.get("bytesOut"); + + // if receiving compressed the bytes out statistic from the connection should reflect the compressed size of the + // message + if(receiverUncompresses && brokerCompressionEnabled) + { + assertTrue("Message was not received compressed", bytesOut < messageText.length()); + } + else + { + assertFalse("Message was incorrectly received compressed", bytesOut < messageText.length()); + } + + consumerConnection.close(); + } + + private String createMessageText() + { + StringBuilder stringBuilder = new StringBuilder(); + while(stringBuilder.length() < 2048*1024) + { + stringBuilder.append("This should compress easily. "); + } + return stringBuilder.toString(); + } + + private Connection getConnection(final boolean compress) throws URLSyntaxException, NamingException, JMSException + { + AMQConnectionURL url = new AMQConnectionURL(getConnectionFactory().getConnectionURLString()); + + url.setOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES,String.valueOf(compress)); + url = new AMQConnectionURL(url.toString()); + url.setUsername(GUEST_USERNAME); + url.setPassword(GUEST_PASSWORD); + url.setOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES,String.valueOf(compress)); + return getConnection(url); + } + +} |