summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java118
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java34
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java43
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java168
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java5
10 files changed, 328 insertions, 130 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 7d9dfcd600..b64d355f80 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,6 +20,39 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
@@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
+ private boolean _compressMessages;
+ private int _messageCompressionThresholdSize;
+
/**
* @param broker brokerdetails
* @param username username
@@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
}
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null)
+ {
+ _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES));
+ }
+ else
+ {
+ _compressMessages =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES,
+ String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES)));
+ }
+
+
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null)
+ {
+ _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE));
+ }
+ else
+ {
+ _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE,
+ ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ }
+ if(_messageCompressionThresholdSize <= 0)
+ {
+ _messageCompressionThresholdSize = Integer.MAX_VALUE;
+ }
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
@@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if ((message == null) || message.equals(""))
+ if (message == null)
{
- if (message == null)
- {
- message = "Unable to Connect";
- }
- else // can only be "" if getMessage() returned it therfore lastException != null
- {
- message = "Unable to Connect:" + connectionException.getClass();
- }
+ message = "Unable to Connect";
+ }
+ else if("".equals(message))
+ {
+ message = "Unable to Connect:" + connectionException.getClass();
}
for (Throwable th = connectionException; th != null; th = th.getCause())
@@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPublish;
}
+ public boolean isMessageCompressionDesired()
+ {
+ return _compressMessages;
+ }
+
public int getNextChannelID()
{
return _sessions.getNextChannelId();
@@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return super.setClosed();
}
+
+ public int getMessageCompressionThresholdSize()
+ {
+ return _messageCompressionThresholdSize;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 0329deee03..74ca1ed74f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -27,10 +32,6 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-import java.io.IOException;
-
public interface AMQConnectionDelegate
{
ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
@@ -82,4 +83,6 @@ public interface AMQConnectionDelegate
void setHeartbeatListener(HeartbeatListener listener);
boolean supportsIsBound();
+
+ boolean isMessageCompressionSupported();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 95b1178407..4e9164c3b0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -21,6 +21,17 @@
package org.apache.qpid.client;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +40,6 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.common.ServerPropertyNames;
-
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
@@ -48,16 +58,6 @@ import org.apache.qpid.transport.SessionDetachCode;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
/**
@@ -441,7 +441,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
try
{
clientProps.put(ConnectionStartProperties.CLIENT_ID_0_10, _conn.getClientID());
- conSettings.setClientProperties(clientProps);
+ if(_conn.isMessageCompressionDesired())
+ {
+ clientProps.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, Boolean.TRUE.toString());
+ }
+ conSettings.setClientProperties(clientProps);
}
catch (JMSException e)
{
@@ -504,4 +508,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
//0-10 supports the isBound method
return true;
}
+
+ @Override
+ public boolean isMessageCompressionSupported()
+ {
+ return _qpidConnection.isMessageCompressionSupported();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index dfbf7ec60a..5242629a91 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -20,8 +20,21 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -49,22 +62,11 @@ import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import javax.jms.JMSException;
-import javax.jms.XASession;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Set;
-
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
+ private boolean _messageCompressionSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -139,6 +141,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
+ _messageCompressionSupported = checkMessageCompressionSupported();
return null;
}
else
@@ -413,4 +416,17 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return connectedToQpid;
}
+
+ private boolean checkMessageCompressionSupported()
+ {
+ FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+ return serverProperties != null
+ && Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED));
+
+ }
+
+ public boolean isMessageCompressionSupported()
+ {
+ return _messageCompressionSupported;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 9efc670e99..eb8104b02c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -17,6 +17,19 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,25 +49,15 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
/**
* This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
+
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class);
private byte[] userIDBytes;
@@ -204,6 +207,22 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
ByteBuffer data = message.getData();
+
+ if(data != null
+ && data.remaining() > getConnection().getMessageCompressionThresholdSize()
+ && getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired()
+ && messageProps.getContentEncoding() == null)
+ {
+ byte[] compressed = GZIPUtils.compressBufferToArray(data);
+ if(compressed != null)
+ {
+ messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ data = ByteBuffer.wrap(compressed);
+ }
+ }
+
+
messageProps.setContentLength(data == null ? 0 : data.remaining());
// send the message
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index b9bb03444f..355c456249 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -44,6 +44,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.util.GZIPUtils;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
@@ -147,7 +148,20 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- final int size = (payload != null) ? payload.limit() : 0;
+ int size = (payload != null) ? payload.remaining() : 0;
+
+ byte[] compressed;
+ if(size > getConnection().getMessageCompressionThresholdSize()
+ && getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired()
+ && contentHeaderProperties.getEncoding() == null
+ && (compressed = GZIPUtils.compressBufferToArray(payload)) != null)
+ {
+ contentHeaderProperties.setEncoding("gzip");
+ payload = ByteBuffer.wrap(compressed);
+ size = compressed.length;
+
+ }
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
index baae072167..e8343fda0a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/CloseWhenNoRouteSettingsHelper.java
@@ -18,11 +18,12 @@
*/
package org.apache.qpid.client.handler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Used during connection establishment to optionally set the "close when no route" client property
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index b0c30f82fa..2e817f2966 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.client.handler;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,12 +47,6 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.properties.ConnectionStartProperties;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import java.io.UnsupportedEncodingException;
-import java.util.StringTokenizer;
-
public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -173,6 +174,9 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
ConnectionURL url = getConnectionURL(session);
_closeWhenNoRouteHelper.setClientProperties(clientProperties, url, serverProperties);
+ clientProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
+ String.valueOf(session.getAMQConnection().isMessageCompressionDesired()));
+
ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -188,7 +192,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
else
{
_log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
- + "] which is not supported by this version of the client library");
+ + "] which is not supported by this version of the client library");
session.closeProtocolSession();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index e52ff9acb2..71d07b1fa0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.client.message;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import javax.jms.JMSException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,16 +39,11 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
-
-import javax.jms.JMSException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.qpid.util.GZIPUtils;
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
@@ -52,46 +58,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- // we optimise the non-fragmented case to avoid copying
- if ((bodies != null) && (bodies.size() == 1))
- {
- if (debug)
- {
- _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
- }
+ byte[] uncompressed;
- data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
+ if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(contentHeader.getProperties().getEncodingAsString())
+ && (uncompressed = GZIPUtils.uncompressStreamToArray(new BodyInputStream(bodies))) != null )
+ {
+ contentHeader.getProperties().setEncoding((String)null);
+ data = ByteBuffer.wrap(uncompressed);
}
- else if (bodies != null)
+ else
{
- if (debug)
+ // we optimise the non-fragmented case to avoid copying
+ if ((bodies != null) && (bodies.size() == 1))
{
- _logger.debug("Fragmented message body (" + bodies
- .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")");
- }
+ if (debug)
+ {
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
+ }
- data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem?
- final Iterator it = bodies.iterator();
- while (it.hasNext())
+ data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
+ }
+ else if (bodies != null)
{
- ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
- if(payload.isDirect() || payload.isReadOnly())
+ if (debug)
{
- data.put(payload);
+ _logger.debug("Fragmented message body (" + bodies
+ .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")");
}
- else
+
+ data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem?
+ final Iterator it = bodies.iterator();
+ while (it.hasNext())
{
- data.put(payload.array(), payload.arrayOffset(), payload.limit());
+ ContentBody cb = (ContentBody) it.next();
+ final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
+ if (payload.isDirect() || payload.isReadOnly())
+ {
+ data.put(payload);
+ }
+ else
+ {
+ data.put(payload.array(), payload.arrayOffset(), payload.limit());
+ }
+
}
+ data.flip();
+ }
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
}
-
- data.flip();
- }
- else // bodies == null
- {
- data = ByteBuffer.allocate(0);
}
if (debug)
@@ -132,22 +149,42 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
_logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
.remaining());
}
+ if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(msgProps.getContentEncoding()))
+ {
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(data);
+ if(uncompressed != null)
+ {
+ msgProps.setContentEncoding(null);
+ data = ByteBuffer.wrap(uncompressed);
+ }
+ }
AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr);
AbstractJMSMessage message = createMessage(delegate, data);
return message;
}
- private static final String asString(byte[] bytes)
+ private ByteBuffer uncompressBody(final InputStream bodyInputStream) throws AMQException
{
- if (bytes == null)
+ final ByteBuffer data;
+ try(GZIPInputStream gzipInputStream = new GZIPInputStream(bodyInputStream))
{
- return null;
+ ByteArrayOutputStream uncompressedBuffer = new ByteArrayOutputStream();
+ int read;
+ byte[] buf = new byte[4096];
+ while((read = gzipInputStream.read(buf))!=-1)
+ {
+ uncompressedBuffer.write(buf,0,read);
+ }
+ byte[] uncompressedBytes = uncompressedBuffer.toByteArray();
+ data = ByteBuffer.wrap(uncompressedBytes);
}
- else
+ catch (IOException e)
{
- return new String(bytes);
+ // TODO - shouldn't happen
+ throw new AMQException("Error uncompressing gzipped message data", e);
}
+ return data;
}
@@ -174,4 +211,57 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
return msg;
}
+ private class BodyInputStream extends InputStream
+ {
+ private final Iterator<ContentBody> _bodiesIter;
+ private byte[] _currentBuffer;
+ private int _currentPos;
+ public BodyInputStream(final List<ContentBody> bodies)
+ {
+ _bodiesIter = bodies.iterator();
+ _currentBuffer = _bodiesIter.next().getPayload();
+ _currentPos = 0;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ byte[] buf = new byte[1];
+ int size = read(buf);
+ if(size == -1)
+ {
+ throw new EOFException();
+ }
+ else
+ {
+ return ((int)buf[0])&0xff;
+ }
+ }
+
+ @Override
+ public int read(final byte[] dst, final int off, final int len)
+ {
+ while(_currentPos == _currentBuffer.length)
+ {
+ if(!_bodiesIter.hasNext())
+ {
+ return -1;
+ }
+ else
+ {
+ _currentBuffer = _bodiesIter.next().getPayload();
+ _currentPos = 0;
+ }
+ }
+ int size = Math.min(len, _currentBuffer.length - _currentPos);
+ System.arraycopy(_currentBuffer,_currentPos, dst,off,size);
+ _currentPos+=size;
+ return size;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 2901a5f983..754b90c372 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -70,6 +70,11 @@ public interface ConnectionURL
*/
public static final String OPTIONS_CLOSE_WHEN_NO_ROUTE = "closeWhenNoRoute";
+
+ public static final String OPTIONS_COMPRESS_MESSAGES = "compressMessages";
+ public static final String OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE = "messageCompressionThresholdSize";
+
+
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";