summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:25:00 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:25:00 +0000
commit19478e74524f27e26f01117b1b973829718ed44d (patch)
tree214cee6e0e1fb25a553aaaa490b775450ea9e8e2 /qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
parente0e8f4c5087c1c5dc787740d6bd862755bd8daf1 (diff)
downloadqpid-python-19478e74524f27e26f01117b1b973829718ed44d.tar.gz
Merging from trunk r1618230:1618433 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620344 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java118
1 files changed, 77 insertions, 41 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 7d9dfcd600..b64d355f80 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,6 +20,39 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,38 +82,6 @@ import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
@@ -191,6 +192,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
+ private boolean _compressMessages;
+ private int _messageCompressionThresholdSize;
+
/**
* @param broker brokerdetails
* @param username username
@@ -325,6 +329,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
}
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null)
+ {
+ _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES));
+ }
+ else
+ {
+ _compressMessages =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES,
+ String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES)));
+ }
+
+
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null)
+ {
+ _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE));
+ }
+ else
+ {
+ _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE,
+ ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE);
+ }
+ if(_messageCompressionThresholdSize <= 0)
+ {
+ _messageCompressionThresholdSize = Integer.MAX_VALUE;
+ }
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
@@ -449,16 +478,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- if ((message == null) || message.equals(""))
+ if (message == null)
{
- if (message == null)
- {
- message = "Unable to Connect";
- }
- else // can only be "" if getMessage() returned it therfore lastException != null
- {
- message = "Unable to Connect:" + connectionException.getClass();
- }
+ message = "Unable to Connect";
+ }
+ else if("".equals(message))
+ {
+ message = "Unable to Connect:" + connectionException.getClass();
}
for (Throwable th = connectionException; th != null; th = th.getCause())
@@ -1543,6 +1569,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _syncPublish;
}
+ public boolean isMessageCompressionDesired()
+ {
+ return _compressMessages;
+ }
+
public int getNextChannelID()
{
return _sessions.getNextChannelId();
@@ -1615,4 +1646,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return super.setClosed();
}
+
+ public int getMessageCompressionThresholdSize()
+ {
+ return _messageCompressionThresholdSize;
+ }
}