summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2013-10-30 21:38:03 +0000
committerKeith Wall <kwall@apache.org>2013-10-30 21:38:03 +0000
commit7202b9e68285695f650c2c05f1b3987b1960004c (patch)
tree814667982fd0bf861dca35f114a7250cc98c23b7 /java/common/src
parent1568bd8090c8b3df4463849ac27dd08452309335 (diff)
downloadqpid-python-7202b9e68285695f650c2c05f1b3987b1960004c.tar.gz
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols * System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols * Enhanced heartbeat system tests * Docbook updates Original patch from Keith Wall, plus updates from Robbie Gemmell git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/Accessor.java136
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java60
-rw-r--r--java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java19
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java39
8 files changed, 202 insertions, 148 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
index 517fd1829f..b73f08f824 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.configuration;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
public interface Accessor
{
@@ -34,6 +28,7 @@ public interface Accessor
public Integer getInt(String name);
public Long getLong(String name);
public String getString(String name);
+ public Float getFloat(String name);
static class SystemPropertyAccessor implements Accessor
{
@@ -56,6 +51,11 @@ public interface Accessor
{
return System.getProperty(name);
}
+
+ public Float getFloat(String name)
+ {
+ return System.getProperty(name) == null ? null : Float.parseFloat(System.getProperty(name));
+ }
}
static class MapAccessor implements Accessor
@@ -147,132 +147,24 @@ public interface Accessor
return null;
}
}
- }
-
- static class PropertyFileAccessor extends MapAccessor
- {
- public PropertyFileAccessor(String fileName) throws FileNotFoundException, IOException
- {
- super(null);
- Properties props = new Properties();
- FileInputStream inStream = new FileInputStream(fileName);
- try
- {
- props.load(inStream);
- }
- finally
- {
- inStream.close();
- }
- setSource(props);
- }
-
-
- }
-
- static class CombinedAccessor implements Accessor
- {
- private List<Accessor> accessors;
-
- public CombinedAccessor(Accessor...accessors)
- {
- this.accessors = Arrays.asList(accessors);
- }
-
- public Boolean getBoolean(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
- {
- return accessor.getBoolean(name);
- }
- }
- return null;
- }
- public Integer getInt(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
- {
- return accessor.getInt(name);
- }
- }
- return null;
- }
-
- public Long getLong(String name)
+ public Float getFloat(String name)
{
- for (Accessor accessor: accessors)
+ if (source != null && source.containsKey(name))
{
- if (accessor.getBoolean(name) != null)
+ if (source.get(name) instanceof Float)
{
- return accessor.getLong(name);
+ return (Float)source.get(name);
}
- }
- return null;
- }
-
- public String getString(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
+ else
{
- return accessor.getString(name);
+ return Float.parseFloat((String)source.get(name));
}
}
- return null;
- }
- }
-
- static class ValidationAccessor implements Accessor
- {
- private List<Validator> validators;
- private Accessor delegate;
-
- public ValidationAccessor(Accessor delegate,Validator...validators)
- {
- this.validators = Arrays.asList(validators);
- this.delegate = delegate;
- }
-
- public Boolean getBoolean(String name)
- {
- // there is nothing to validate in a boolean
- return delegate.getBoolean(name);
- }
-
- public Integer getInt(String name)
- {
- Integer v = delegate.getInt(name);
- for (Validator validator: validators)
- {
- validator.validate(v);
- }
- return v;
- }
-
- public Long getLong(String name)
- {
- Long v = delegate.getLong(name);
- for (Validator validator: validators)
- {
- validator.validate(v);
- }
- return v;
- }
-
- public String getString(String name)
- {
- String v = delegate.getString(name);
- for (Validator validator: validators)
+ else
{
- validator.validate(v);
+ return null;
}
- return v;
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 51dad51bf9..b43b9d450b 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -63,20 +63,47 @@ public class ClientProperties
public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
/**
- * This value will be used in the following settings
- * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
- * If this values is between the max and min values specified for heartbeat
- * by the broker in TuneOK it will be used as the heartbeat interval.
- * If not a warning will be printed and the max value specified for
- * heartbeat in TuneOK will be used
- *
- * The default idle timeout is set to 120 secs
+ * Frequency of heartbeat messages (in milliseconds)
+ * @see #QPID_HEARTBEAT_INTERVAL
*/
+ @Deprecated
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
- public static final long DEFAULT_IDLE_TIMEOUT = 120000;
- public static final String HEARTBEAT = "qpid.heartbeat";
- public static final int HEARTBEAT_DEFAULT = 120;
+ /**
+ * Frequency of heartbeat messages (in seconds)
+ * @see #QPID_HEARTBEAT_INTERVAL
+ */
+ @Deprecated
+ public static final String AMQJ_HEARTBEAT_DELAY = "amqj.heartbeat.delay";
+
+ /**
+ * Frequency of heartbeat messages (in seconds)
+ */
+ public static final String QPID_HEARTBEAT_INTERVAL = "qpid.heartbeat";
+
+ /**
+ * Default heartbeat interval (used by 0-10 protocol).
+ */
+ public static final int QPID_HEARTBEAT_INTERVAL_010_DEFAULT = 120;
+
+ /**
+ * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR
+ */
+ @Deprecated
+ public static final String AMQJ_HEARTBEAT_TIMEOUT_FACTOR = "amqj.heartbeat.timeoutFactor";
+
+ /**
+ * The factor applied to {@link #QPID_HEARTBEAT_INTERVAL} that determines the maximum
+ * length of time that may elapse before the peer is deemed to have failed.
+ *
+ * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT
+ */
+ public static final String QPID_HEARTBEAT_TIMEOUT_FACTOR = "qpid.heartbeat_timeout_factor";
+
+ /**
+ * Default heartbeat timeout factor.
+ */
+ public static final float QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT = 2.0f;
/**
* This value will be used to determine the default destination syntax type.
@@ -215,6 +242,8 @@ public class ClientProperties
*/
public static final String SET_EXPIRATION_AS_TTL = "qpid.set_expiration_as_ttl";
+
+
private ClientProperties()
{
//No instances
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
index e0989495bb..3ed32a604a 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
@@ -102,6 +102,11 @@ public abstract class QpidProperty<T>
return new QpidStringProperty(accessor,defaultValue, names);
}
+ public static QpidProperty<Float> floatProperty(Float defaultValue, String... names)
+ {
+ return new QpidFloatProperty(defaultValue, names);
+ }
+
protected Accessor getAccessor()
{
return accessor;
@@ -183,4 +188,23 @@ public abstract class QpidProperty<T>
}
}
+ static class QpidFloatProperty extends QpidProperty<Float>
+ {
+ QpidFloatProperty(Float defValue, String... names)
+ {
+ super(defValue, names);
+ }
+
+ QpidFloatProperty(Accessor accessor,Float defValue, String... names)
+ {
+ super(accessor,defValue, names);
+ }
+
+ @Override
+ protected Float getByName(String name)
+ {
+ return getAccessor().getFloat(name);
+ }
+ }
+
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 185c01d3df..33604b05d9 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -62,6 +62,5 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void setSender(Sender<ByteBuffer> sender);
- public void init();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index c75adab444..75eb0e19a7 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -133,15 +133,17 @@ public class ClientDelegate extends ConnectionDelegate
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(),
- tune.getHeartbeatMin(),
- tune.getHeartbeatMax()
- );
+ int heartbeatInterval = _connectionSettings.getHeartbeatInterval010();
+ float heartbeatTimeoutFactor = _connectionSettings.getHeartbeatTimeoutFactor();
+ int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax());
conn.connectionTuneOk(tune.getChannelMax(),
tune.getMaxFrameSize(),
- hb_interval);
- // The idle timeout is twice the heartbeat amount (in milisecs)
- conn.setIdleTimeout(hb_interval*1000*2);
+ actualHeartbeatInterval);
+
+ int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
//0 means no implied limit, except available server resources
@@ -184,7 +186,7 @@ public class ClientDelegate extends ConnectionDelegate
int i = heartbeat;
if (i == 0)
{
- log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
+ log.info("Heartbeat interval is 0 sec. Heartbeats are disabled.");
return 0; // heartbeats are disabled.
}
else if (i >= min && i <= max)
@@ -193,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate
}
else
{
- log.info("The broker does not support the configured connection idle timeout of %s sec," +
+ log.info("The broker does not support the configured connection heartbeat interval of %s sec," +
" using the brokers max supported value of %s sec instead.", i,max);
return max;
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
index 14dfeb18ec..2ff08fd751 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR;
+import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT;
import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME;
import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME;
import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME;
@@ -50,6 +57,7 @@ public class ConnectionSettings
{
public static final String WILDCARD_ADDRESS = "*";
+
private String protocol = "tcp";
private String host = "localhost";
private String vhost;
@@ -59,7 +67,9 @@ public class ConnectionSettings
private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get();
private int maxChannelCount = 32767;
private int maxFrameSize = 65535;
- private int heartbeatInterval;
+ private Integer hearbeatIntervalLegacyMs = QpidProperty.intProperty(null, IDLE_TIMEOUT_PROP_NAME).get();
+ private Integer heartbeatInterval = QpidProperty.intProperty(null, QPID_HEARTBEAT_INTERVAL, AMQJ_HEARTBEAT_DELAY).get();
+ private float heartbeatTimeoutFactor = QpidProperty.floatProperty(QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT, QPID_HEARTBEAT_TIMEOUT_FACTOR, AMQJ_HEARTBEAT_TIMEOUT_FACTOR).get();
private int connectTimeout = 30000;
private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get();
private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();;
@@ -95,9 +105,45 @@ public class ConnectionSettings
this.tcpNodelay = tcpNodelay;
}
- public int getHeartbeatInterval()
- {
- return heartbeatInterval;
+ /**
+ * Gets the heartbeat interval (seconds) for 0-8/9/9-1 protocols.
+ * 0 means heartbeating is disabled.
+ * null means use the broker-supplied value.
+ */
+ public Integer getHeartbeatInterval08()
+ {
+ if (heartbeatInterval != null)
+ {
+ return heartbeatInterval;
+ }
+ else if (hearbeatIntervalLegacyMs != null)
+ {
+ return hearbeatIntervalLegacyMs / 1000;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the heartbeat interval (seconds) for the 0-10 protocol.
+ * 0 means heartbeating is disabled.
+ */
+ public int getHeartbeatInterval010()
+ {
+ if (heartbeatInterval != null)
+ {
+ return heartbeatInterval;
+ }
+ else if (hearbeatIntervalLegacyMs != null)
+ {
+ return hearbeatIntervalLegacyMs / 1000;
+ }
+ else
+ {
+ return QPID_HEARTBEAT_INTERVAL_010_DEFAULT;
+ }
}
public void setHeartbeatInterval(int heartbeatInterval)
@@ -105,6 +151,11 @@ public class ConnectionSettings
this.heartbeatInterval = heartbeatInterval;
}
+ public float getHeartbeatTimeoutFactor()
+ {
+ return this.heartbeatTimeoutFactor;
+ }
+
public String getProtocol()
{
return protocol;
@@ -374,4 +425,5 @@ public class ConnectionSettings
{
this.writeBufferSize = writeBufferSize;
}
+
}
diff --git a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java
index 2a8c177f64..335270264c 100644
--- a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java
+++ b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java
@@ -145,6 +145,25 @@ public class QpidPropertyTest extends QpidTestCase
assertEquals(expectedValue, propertyValue);
}
+ public void testFloatValueReadFromSystemProperty() throws Exception
+ {
+ float expectedValue = 1.5f;
+ setTestSystemProperty(_systemPropertyName, Float.valueOf(expectedValue).toString());
+ assertSystemPropertiesSet(_systemPropertyName);
+
+ float propertyValue = QpidProperty.floatProperty(1.5f, _systemPropertyName).get();
+ assertEquals(expectedValue, propertyValue, 0.1);
+ }
+
+ public void testFloatValueIsDefaultWhenOneSystemPropertyIsNotSet() throws Exception
+ {
+ float expectedValue = 1.5f;
+ assertSystemPropertiesNotSet(_systemPropertyName);
+
+ float propertyValue = QpidProperty.floatProperty(expectedValue, _systemPropertyName).get();
+ assertEquals(expectedValue, propertyValue, 0.1);
+ }
+
private void assertSystemPropertiesSet(String... systemPropertyNames)
{
for (String systemPropertyName : systemPropertyNames)
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java
index fc4f5374f0..d031842f9d 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java
@@ -128,11 +128,48 @@ public class ConnectionSettingsTest extends QpidTestCase
}
@SuppressWarnings("deprecation")
- public void testtestReceiveBufferSizeOverriddenLegacyOverridden()
+ public void testReceiveBufferSizeOverriddenLegacyOverridden()
{
systemPropertyOverrideForSocketBufferSize(ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME, 1024, true);
}
+ public void testHeartbeatingDefaults()
+ {
+ assertNull(_conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT,_conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(2.0, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ public void testHeartbeatingOverridden()
+ {
+ resetSystemProperty(ClientProperties.QPID_HEARTBEAT_INTERVAL, "60");
+ resetSystemProperty(ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR, "2.5");
+
+ assertEquals(Integer.valueOf(60), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(60, _conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(2.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatingOverriddenUsingAmqjLegacyOption()
+ {
+ resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_DELAY, "30");
+ resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR, "1.5");
+
+ assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(30, _conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(1.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatingOverriddenUsingOlderLegacyOption()
+ {
+ resetSystemProperty(ClientProperties.IDLE_TIMEOUT_PROP_NAME, "30000");
+
+ assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(30, _conConnectionSettings.getHeartbeatInterval010());
+ }
+
private void systemPropertyOverrideForTcpDelay(String propertyName, boolean value)
{
resetSystemProperty(propertyName, String.valueOf(value));