diff options
author | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
commit | 7202b9e68285695f650c2c05f1b3987b1960004c (patch) | |
tree | 814667982fd0bf861dca35f114a7250cc98c23b7 /java/common/src | |
parent | 1568bd8090c8b3df4463849ac27dd08452309335 (diff) | |
download | qpid-python-7202b9e68285695f650c2c05f1b3987b1960004c.tar.gz |
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols
* System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols
* Enhanced heartbeat system tests
* Docbook updates
Original patch from Keith Wall, plus updates from Robbie Gemmell
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
8 files changed, 202 insertions, 148 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index 517fd1829f..b73f08f824 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.configuration; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.Properties; public interface Accessor { @@ -34,6 +28,7 @@ public interface Accessor public Integer getInt(String name); public Long getLong(String name); public String getString(String name); + public Float getFloat(String name); static class SystemPropertyAccessor implements Accessor { @@ -56,6 +51,11 @@ public interface Accessor { return System.getProperty(name); } + + public Float getFloat(String name) + { + return System.getProperty(name) == null ? null : Float.parseFloat(System.getProperty(name)); + } } static class MapAccessor implements Accessor @@ -147,132 +147,24 @@ public interface Accessor return null; } } - } - - static class PropertyFileAccessor extends MapAccessor - { - public PropertyFileAccessor(String fileName) throws FileNotFoundException, IOException - { - super(null); - Properties props = new Properties(); - FileInputStream inStream = new FileInputStream(fileName); - try - { - props.load(inStream); - } - finally - { - inStream.close(); - } - setSource(props); - } - - - } - - static class CombinedAccessor implements Accessor - { - private List<Accessor> accessors; - - public CombinedAccessor(Accessor...accessors) - { - this.accessors = Arrays.asList(accessors); - } - - public Boolean getBoolean(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getBoolean(name); - } - } - return null; - } - public Integer getInt(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getInt(name); - } - } - return null; - } - - public Long getLong(String name) + public Float getFloat(String name) { - for (Accessor accessor: accessors) + if (source != null && source.containsKey(name)) { - if (accessor.getBoolean(name) != null) + if (source.get(name) instanceof Float) { - return accessor.getLong(name); + return (Float)source.get(name); } - } - return null; - } - - public String getString(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) + else { - return accessor.getString(name); + return Float.parseFloat((String)source.get(name)); } } - return null; - } - } - - static class ValidationAccessor implements Accessor - { - private List<Validator> validators; - private Accessor delegate; - - public ValidationAccessor(Accessor delegate,Validator...validators) - { - this.validators = Arrays.asList(validators); - this.delegate = delegate; - } - - public Boolean getBoolean(String name) - { - // there is nothing to validate in a boolean - return delegate.getBoolean(name); - } - - public Integer getInt(String name) - { - Integer v = delegate.getInt(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public Long getLong(String name) - { - Long v = delegate.getLong(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public String getString(String name) - { - String v = delegate.getString(name); - for (Validator validator: validators) + else { - validator.validate(v); + return null; } - return v; } } } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 51dad51bf9..b43b9d450b 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -63,20 +63,47 @@ public class ClientProperties public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** - * This value will be used in the following settings - * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) - * If this values is between the max and min values specified for heartbeat - * by the broker in TuneOK it will be used as the heartbeat interval. - * If not a warning will be printed and the max value specified for - * heartbeat in TuneOK will be used - * - * The default idle timeout is set to 120 secs + * Frequency of heartbeat messages (in milliseconds) + * @see #QPID_HEARTBEAT_INTERVAL */ + @Deprecated public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; - public static final long DEFAULT_IDLE_TIMEOUT = 120000; - public static final String HEARTBEAT = "qpid.heartbeat"; - public static final int HEARTBEAT_DEFAULT = 120; + /** + * Frequency of heartbeat messages (in seconds) + * @see #QPID_HEARTBEAT_INTERVAL + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_DELAY = "amqj.heartbeat.delay"; + + /** + * Frequency of heartbeat messages (in seconds) + */ + public static final String QPID_HEARTBEAT_INTERVAL = "qpid.heartbeat"; + + /** + * Default heartbeat interval (used by 0-10 protocol). + */ + public static final int QPID_HEARTBEAT_INTERVAL_010_DEFAULT = 120; + + /** + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_TIMEOUT_FACTOR = "amqj.heartbeat.timeoutFactor"; + + /** + * The factor applied to {@link #QPID_HEARTBEAT_INTERVAL} that determines the maximum + * length of time that may elapse before the peer is deemed to have failed. + * + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT + */ + public static final String QPID_HEARTBEAT_TIMEOUT_FACTOR = "qpid.heartbeat_timeout_factor"; + + /** + * Default heartbeat timeout factor. + */ + public static final float QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT = 2.0f; /** * This value will be used to determine the default destination syntax type. @@ -215,6 +242,8 @@ public class ClientProperties */ public static final String SET_EXPIRATION_AS_TTL = "qpid.set_expiration_as_ttl"; + + private ClientProperties() { //No instances diff --git a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java index e0989495bb..3ed32a604a 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java @@ -102,6 +102,11 @@ public abstract class QpidProperty<T> return new QpidStringProperty(accessor,defaultValue, names); } + public static QpidProperty<Float> floatProperty(Float defaultValue, String... names) + { + return new QpidFloatProperty(defaultValue, names); + } + protected Accessor getAccessor() { return accessor; @@ -183,4 +188,23 @@ public abstract class QpidProperty<T> } } + static class QpidFloatProperty extends QpidProperty<Float> + { + QpidFloatProperty(Float defValue, String... names) + { + super(defValue, names); + } + + QpidFloatProperty(Accessor accessor,Float defValue, String... names) + { + super(accessor,defValue, names); + } + + @Override + protected Float getByName(String name) + { + return getAccessor().getFloat(name); + } + } + }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 185c01d3df..33604b05d9 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -62,6 +62,5 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void setSender(Sender<ByteBuffer> sender); - public void init(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c75adab444..75eb0e19a7 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -133,15 +133,17 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { - int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(), - tune.getHeartbeatMin(), - tune.getHeartbeatMax() - ); + int heartbeatInterval = _connectionSettings.getHeartbeatInterval010(); + float heartbeatTimeoutFactor = _connectionSettings.getHeartbeatTimeoutFactor(); + int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, + tune.getHeartbeatMin(), + tune.getHeartbeatMax()); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), - hb_interval); - // The idle timeout is twice the heartbeat amount (in milisecs) - conn.setIdleTimeout(hb_interval*1000*2); + actualHeartbeatInterval); + + int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); //0 means no implied limit, except available server resources @@ -184,7 +186,7 @@ public class ClientDelegate extends ConnectionDelegate int i = heartbeat; if (i == 0) { - log.info("Idle timeout is 0 sec. Heartbeats are disabled."); + log.info("Heartbeat interval is 0 sec. Heartbeats are disabled."); return 0; // heartbeats are disabled. } else if (i >= min && i <= max) @@ -193,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate } else { - log.info("The broker does not support the configured connection idle timeout of %s sec," + + log.info("The broker does not support the configured connection heartbeat interval of %s sec," + " using the brokers max supported value of %s sec instead.", i,max); return max; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 14dfeb18ec..2ff08fd751 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT; import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME; @@ -50,6 +57,7 @@ public class ConnectionSettings { public static final String WILDCARD_ADDRESS = "*"; + private String protocol = "tcp"; private String host = "localhost"; private String vhost; @@ -59,7 +67,9 @@ public class ConnectionSettings private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get(); private int maxChannelCount = 32767; private int maxFrameSize = 65535; - private int heartbeatInterval; + private Integer hearbeatIntervalLegacyMs = QpidProperty.intProperty(null, IDLE_TIMEOUT_PROP_NAME).get(); + private Integer heartbeatInterval = QpidProperty.intProperty(null, QPID_HEARTBEAT_INTERVAL, AMQJ_HEARTBEAT_DELAY).get(); + private float heartbeatTimeoutFactor = QpidProperty.floatProperty(QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT, QPID_HEARTBEAT_TIMEOUT_FACTOR, AMQJ_HEARTBEAT_TIMEOUT_FACTOR).get(); private int connectTimeout = 30000; private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get(); private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();; @@ -95,9 +105,45 @@ public class ConnectionSettings this.tcpNodelay = tcpNodelay; } - public int getHeartbeatInterval() - { - return heartbeatInterval; + /** + * Gets the heartbeat interval (seconds) for 0-8/9/9-1 protocols. + * 0 means heartbeating is disabled. + * null means use the broker-supplied value. + */ + public Integer getHeartbeatInterval08() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return null; + } + } + + /** + * Gets the heartbeat interval (seconds) for the 0-10 protocol. + * 0 means heartbeating is disabled. + */ + public int getHeartbeatInterval010() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return QPID_HEARTBEAT_INTERVAL_010_DEFAULT; + } } public void setHeartbeatInterval(int heartbeatInterval) @@ -105,6 +151,11 @@ public class ConnectionSettings this.heartbeatInterval = heartbeatInterval; } + public float getHeartbeatTimeoutFactor() + { + return this.heartbeatTimeoutFactor; + } + public String getProtocol() { return protocol; @@ -374,4 +425,5 @@ public class ConnectionSettings { this.writeBufferSize = writeBufferSize; } + } diff --git a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java index 2a8c177f64..335270264c 100644 --- a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java +++ b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java @@ -145,6 +145,25 @@ public class QpidPropertyTest extends QpidTestCase assertEquals(expectedValue, propertyValue); } + public void testFloatValueReadFromSystemProperty() throws Exception + { + float expectedValue = 1.5f; + setTestSystemProperty(_systemPropertyName, Float.valueOf(expectedValue).toString()); + assertSystemPropertiesSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(1.5f, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + + public void testFloatValueIsDefaultWhenOneSystemPropertyIsNotSet() throws Exception + { + float expectedValue = 1.5f; + assertSystemPropertiesNotSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(expectedValue, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + private void assertSystemPropertiesSet(String... systemPropertyNames) { for (String systemPropertyName : systemPropertyNames) diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java index fc4f5374f0..d031842f9d 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java @@ -128,11 +128,48 @@ public class ConnectionSettingsTest extends QpidTestCase } @SuppressWarnings("deprecation") - public void testtestReceiveBufferSizeOverriddenLegacyOverridden() + public void testReceiveBufferSizeOverriddenLegacyOverridden() { systemPropertyOverrideForSocketBufferSize(ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME, 1024, true); } + public void testHeartbeatingDefaults() + { + assertNull(_conConnectionSettings.getHeartbeatInterval08()); + assertEquals(ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT,_conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.0, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + public void testHeartbeatingOverridden() + { + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_INTERVAL, "60"); + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR, "2.5"); + + assertEquals(Integer.valueOf(60), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(60, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingAmqjLegacyOption() + { + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_DELAY, "30"); + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR, "1.5"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(1.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingOlderLegacyOption() + { + resetSystemProperty(ClientProperties.IDLE_TIMEOUT_PROP_NAME, "30000"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + } + private void systemPropertyOverrideForTcpDelay(String propertyName, boolean value) { resetSystemProperty(propertyName, String.valueOf(value)); |