diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:25:00 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:25:00 +0000 |
commit | 19478e74524f27e26f01117b1b973829718ed44d (patch) | |
tree | 214cee6e0e1fb25a553aaaa490b775450ea9e8e2 /qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | |
parent | e0e8f4c5087c1c5dc787740d6bd862755bd8daf1 (diff) | |
download | qpid-python-19478e74524f27e26f01117b1b973829718ed44d.tar.gz |
Merging from trunk r1618230:1618433 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620344 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 118 |
1 files changed, 77 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 7d9dfcd600..b64d355f80 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,6 +20,39 @@ */ package org.apache.qpid.client; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.ServerSessionPool; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); @@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Address resolution purposes private volatile long _lastFailoverTime = 0; + private boolean _compressMessages; + private int _messageCompressionThresholdSize; + /** * @param broker brokerdetails * @param username username @@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); } + if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null) + { + _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES)); + } + else + { + _compressMessages = + Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES, + String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES))); + } + + + if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null) + { + _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE)); + } + else + { + _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE, + ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE); + } + if(_messageCompressionThresholdSize <= 0) + { + _messageCompressionThresholdSize = Integer.MAX_VALUE; + } String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) @@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - if ((message == null) || message.equals("")) + if (message == null) { - if (message == null) - { - message = "Unable to Connect"; - } - else // can only be "" if getMessage() returned it therfore lastException != null - { - message = "Unable to Connect:" + connectionException.getClass(); - } + message = "Unable to Connect"; + } + else if("".equals(message)) + { + message = "Unable to Connect:" + connectionException.getClass(); } for (Throwable th = connectionException; th != null; th = th.getCause()) @@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _syncPublish; } + public boolean isMessageCompressionDesired() + { + return _compressMessages; + } + public int getNextChannelID() { return _sessions.getNextChannelId(); @@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return super.setClosed(); } + + public int getMessageCompressionThresholdSize() + { + return _messageCompressionThresholdSize; + } } |