diff options
Diffstat (limited to 'qpid/java/client')
10 files changed, 328 insertions, 130 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 7d9dfcd600..b64d355f80 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,6 +20,39 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Address resolution purposes private volatile long _lastFailoverTime = 0; + private boolean _compressMessages; + private int _messageCompressionThresholdSize; + /** * @param broker brokerdetails * @param username username @@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null) + { + _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES)); + } + else + { + _compressMessages = + Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES, + String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES))); + } + + + if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null) + { + _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE)); + } + else + { + _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE, + ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE); + } + if(_messageCompressionThresholdSize <= 0) + { + _messageCompressionThresholdSize = Integer.MAX_VALUE; + } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) @@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if ((message == null) || message.equals("")) + if (message == null) { - if (message == null) - { - message = "Unable to Connect"; - } - else // can only be "" if getMessage() returned it therfore lastException != null - { - message = "Unable to Connect:" + connectionException.getClass(); - } + message = "Unable to Connect"; + } + else if("".equals(message)) + { + message = "Unable to Connect:" + connectionException.getClass(); } for (Throwable th = connectionException; th != null; th = th.getCause()) @@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPublish; } + public boolean isMessageCompressionDesired() + { + return _compressMessages; + } + public int getNextChannelID() { return _sessions.getNextChannelId(); @@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return super.setClosed(); } + + public int getMessageCompressionThresholdSize() + { + return _messageCompressionThresholdSize; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 0329deee03..74ca1ed74f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.client; +import java.io.IOException; + +import javax.jms.JMSException; +import javax.jms.XASession; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -27,10 +32,6 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import javax.jms.JMSException; -import javax.jms.XASession; -import java.io.IOException; - public interface AMQConnectionDelegate { ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; @@ -82,4 +83,6 @@ public interface AMQConnectionDelegate void setHeartbeatListener(HeartbeatListener listener); boolean supportsIsBound(); + + boolean isMessageCompressionSupported(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 95b1178407..4e9164c3b0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -21,6 +21,17 @@ package org.apache.qpid.client; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.XASession; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +40,6 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.common.ServerPropertyNames; - import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; @@ -48,16 +58,6 @@ import org.apache.qpid.transport.SessionDetachCode; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.XASession; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** @@ -441,7 +441,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec try { clientProps.put(ConnectionStartProperties.CLIENT_ID_0_10, _conn.getClientID()); - conSettings.setClientProperties(clientProps); + if(_conn.isMessageCompressionDesired()) + { + clientProps.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, Boolean.TRUE.toString()); + } + conSettings.setClientProperties(clientProps); } catch (JMSException e) { @@ -504,4 +508,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec //0-10 supports the isBound method return true; } + + @Override + public boolean isMessageCompressionSupported() + { + return _qpidConnection.isMessageCompressionSupported(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index dfbf7ec60a..5242629a91 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -20,8 +20,21 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.UnresolvedAddressException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Set; + +import javax.jms.JMSException; +import javax.jms.XASession; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -49,22 +62,11 @@ import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import javax.jms.JMSException; -import javax.jms.XASession; - -import java.io.IOException; -import java.net.ConnectException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Set; - public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; + private boolean _messageCompressionSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -139,6 +141,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn.getFailoverPolicy().attainedConnection(); _conn.setConnected(true); _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); + _messageCompressionSupported = checkMessageCompressionSupported(); return null; } else @@ -413,4 +416,17 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return connectedToQpid; } + + private boolean checkMessageCompressionSupported() + { + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + return serverProperties != null + && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)); + + } + + public boolean isMessageCompressionSupported() + { + return _messageCompressionSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 9efc670e99..eb8104b02c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,6 +17,19 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,25 +49,15 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - /** * This is a 0_10 message producer. */ public class BasicMessageProducer_0_10 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class); private byte[] userIDBytes; @@ -204,6 +207,22 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } ByteBuffer data = message.getData(); + + if(data != null + && data.remaining() > getConnection().getMessageCompressionThresholdSize() + && getConnection().getDelegate().isMessageCompressionSupported() + && getConnection().isMessageCompressionDesired() + && messageProps.getContentEncoding() == null) + { + byte[] compressed = GZIPUtils.compressBufferToArray(data); + if(compressed != null) + { + messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); + data = ByteBuffer.wrap(compressed); + } + } + + messageProps.setContentLength(data == null ? 0 : data.remaining()); // send the message diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index b9bb03444f..355c456249 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -44,6 +44,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.util.GZIPUtils; public class BasicMessageProducer_0_8 extends BasicMessageProducer { @@ -147,7 +148,20 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - final int size = (payload != null) ? payload.limit() : 0; + int size = (payload != null) ? payload.remaining() : 0; + + byte[] compressed; + if(size > getConnection().getMessageCompressionThresholdSize() + && getConnection().getDelegate().isMessageCompressionSupported() + && getConnection().isMessageCompressionDesired() + && contentHeaderProperties.getEncoding() == null + && (compressed = GZIPUtils.compressBufferToArray(payload)) != null) + { + contentHeaderProperties.setEncoding("gzip"); + payload = ByteBuffer.wrap(compressed); + size = compressed.length; + + } final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java index baae072167..e8343fda0a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java @@ -18,11 +18,12 @@ */ package org.apache.qpid.client.handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.properties.ConnectionStartProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Used during connection establishment to optionally set the "close when no route" client property diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index b0c30f82fa..2e817f2966 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.client.handler; +import java.io.UnsupportedEncodingException; +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +47,6 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.properties.ConnectionStartProperties; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import java.io.UnsupportedEncodingException; -import java.util.StringTokenizer; - public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody> { private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class); @@ -173,6 +174,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co ConnectionURL url = getConnectionURL(session); _closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties); + clientProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, + String.valueOf(session.getAMQConnection().isMessageCompressionDesired())); + ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -188,7 +192,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co else { _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor() - + "] which is not supported by this version of the client library"); + + "] which is not supported by this version of the client library"); session.closeProtocolSession(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index e52ff9acb2..71d07b1fa0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.client.message; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import javax.jms.JMSException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,16 +39,11 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.MessageProperties; - -import javax.jms.JMSException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; +import org.apache.qpid.util.GZIPUtils; public abstract class AbstractJMSMessageFactory implements MessageFactory { @@ -52,46 +58,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); - // we optimise the non-fragmented case to avoid copying - if ((bodies != null) && (bodies.size() == 1)) - { - if (debug) - { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); - } + byte[] uncompressed; - data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); + if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(contentHeader.getProperties().getEncodingAsString()) + && (uncompressed = GZIPUtils.uncompressStreamToArray(new BodyInputStream(bodies))) != null ) + { + contentHeader.getProperties().setEncoding((String)null); + data = ByteBuffer.wrap(uncompressed); } - else if (bodies != null) + else { - if (debug) + // we optimise the non-fragmented case to avoid copying + if ((bodies != null) && (bodies.size() == 1)) { - _logger.debug("Fragmented message body (" + bodies - .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); - } + if (debug) + { + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); + } - data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? - final Iterator it = bodies.iterator(); - while (it.hasNext()) + data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); + } + else if (bodies != null) { - ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); - if(payload.isDirect() || payload.isReadOnly()) + if (debug) { - data.put(payload); + _logger.debug("Fragmented message body (" + bodies + .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); } - else + + data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? + final Iterator it = bodies.iterator(); + while (it.hasNext()) { - data.put(payload.array(), payload.arrayOffset(), payload.limit()); + ContentBody cb = (ContentBody) it.next(); + final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); + if (payload.isDirect() || payload.isReadOnly()) + { + data.put(payload); + } + else + { + data.put(payload.array(), payload.arrayOffset(), payload.limit()); + } + } + data.flip(); + } + else // bodies == null + { + data = ByteBuffer.allocate(0); } - - data.flip(); - } - else // bodies == null - { - data = ByteBuffer.allocate(0); } if (debug) @@ -132,22 +149,42 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data .remaining()); } + if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(msgProps.getContentEncoding())) + { + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(data); + if(uncompressed != null) + { + msgProps.setContentEncoding(null); + data = ByteBuffer.wrap(uncompressed); + } + } AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr); AbstractJMSMessage message = createMessage(delegate, data); return message; } - private static final String asString(byte[] bytes) + private ByteBuffer uncompressBody(final InputStream bodyInputStream) throws AMQException { - if (bytes == null) + final ByteBuffer data; + try(GZIPInputStream gzipInputStream = new GZIPInputStream(bodyInputStream)) { - return null; + ByteArrayOutputStream uncompressedBuffer = new ByteArrayOutputStream(); + int read; + byte[] buf = new byte[4096]; + while((read = gzipInputStream.read(buf))!=-1) + { + uncompressedBuffer.write(buf,0,read); + } + byte[] uncompressedBytes = uncompressedBuffer.toByteArray(); + data = ByteBuffer.wrap(uncompressedBytes); } - else + catch (IOException e) { - return new String(bytes); + // TODO - shouldn't happen + throw new AMQException("Error uncompressing gzipped message data", e); } + return data; } @@ -174,4 +211,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory return msg; } + private class BodyInputStream extends InputStream + { + private final Iterator<ContentBody> _bodiesIter; + private byte[] _currentBuffer; + private int _currentPos; + public BodyInputStream(final List<ContentBody> bodies) + { + _bodiesIter = bodies.iterator(); + _currentBuffer = _bodiesIter.next().getPayload(); + _currentPos = 0; + } + + @Override + public int read() throws IOException + { + byte[] buf = new byte[1]; + int size = read(buf); + if(size == -1) + { + throw new EOFException(); + } + else + { + return ((int)buf[0])&0xff; + } + } + + @Override + public int read(final byte[] dst, final int off, final int len) + { + while(_currentPos == _currentBuffer.length) + { + if(!_bodiesIter.hasNext()) + { + return -1; + } + else + { + _currentBuffer = _bodiesIter.next().getPayload(); + _currentPos = 0; + } + } + int size = Math.min(len, _currentBuffer.length - _currentPos); + System.arraycopy(_currentBuffer,_currentPos, dst,off,size); + _currentPos+=size; + return size; + } + + @Override + public void close() + { + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 2901a5f983..754b90c372 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -70,6 +70,11 @@ public interface ConnectionURL */ public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute"; + + public static final String OPTIONS_COMPRESS_MESSAGES = "compressMessages"; + public static final String OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE = "messageCompressionThresholdSize"; + + public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; |