summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQConnection.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java50
1 files changed, 48 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d80858a7a1..9612417266 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -179,6 +179,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ // Indicates whether to use the old stream message format or the
+ // new amqp-0-10 list encoded format.
+ private boolean _useLegacyStreamMessageFormat;
+
+ // When sending to a Queue destination for the first time, check that the queue is bound
+ private final boolean _validateQueueOnSend;
+
//used to track the last failover time for
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
@@ -294,6 +301,30 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null)
+ {
+ _useLegacyStreamMessageFormat = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT));
+ }
+ else
+ {
+ // use the default value set for all connections
+ _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ?
+ true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
+ }
+
+ if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
+ {
+ _validateQueueOnSend = Boolean.parseBoolean(
+ connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
+ }
+ else
+ {
+ _validateQueueOnSend =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
+ }
+
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
{
@@ -1080,7 +1111,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _started;
}
- protected final boolean isConnected()
+ public final boolean isConnected()
{
return _connected;
}
@@ -1425,7 +1456,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _delegate.getProtocolVersion();
}
-
+
public String getBrokerUUID()
{
if(getProtocolVersion().equals(ProtocolVersion.v0_10))
@@ -1498,6 +1529,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _useLegacyMapMessageFormat;
}
+ public boolean isUseLegacyStreamMessageFormat()
+ {
+ return _useLegacyStreamMessageFormat;
+ }
+
private void verifyClientID() throws AMQException
{
if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
@@ -1539,4 +1575,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
+
+ public boolean validateQueueOnSend()
+ {
+ return _validateQueueOnSend;
+ }
}