diff options
author | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
commit | 7202b9e68285695f650c2c05f1b3987b1960004c (patch) | |
tree | 814667982fd0bf861dca35f114a7250cc98c23b7 /java | |
parent | 1568bd8090c8b3df4463849ac27dd08452309335 (diff) | |
download | qpid-python-7202b9e68285695f650c2c05f1b3987b1960004c.tar.gz |
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols
* System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols
* Enhanced heartbeat system tests
* Docbook updates
Original patch from Keith Wall, plus updates from Robbie Gemmell
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
22 files changed, 416 insertions, 321 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 33300e9e59..4cacae134e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1287,11 +1287,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void init() - { - // Do nothing - } - public void setSender(Sender<ByteBuffer> sender) { // Do nothing diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 597096db57..25d37aafb1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -290,6 +290,19 @@ public class AMQBrokerDetails implements BrokerDetails } } + private int getIntegerProperty(String key) + { + String stringValue = getProperty(key); + try + { + return Integer.parseInt(stringValue); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException("Cannot parse key " + key + " with value '" + stringValue + "' as integer.", e); + } + } + public String toString() { StringBuffer sb = new StringBuffer(); @@ -464,6 +477,16 @@ public class AMQBrokerDetails implements BrokerDetails conSettings.setConnectTimeout(lookupConnectTimeout()); + if (getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + else if (getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) / 1000); + } + return conSettings; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 66590aa0d7..95b1178407 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -29,7 +29,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.configuration.ClientProperties; + import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; @@ -448,8 +448,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec // Ignore } - conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); - //Check connection-level ssl override setting String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); if(connectionSslOption != null) @@ -470,37 +468,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return conSettings; } - - // The idle_timeout prop is in milisecs while - // the new heartbeat prop is in secs - private int getHeartbeatInterval(BrokerDetails brokerDetail) - { - int heartbeat = 0; - if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) - { - _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>"); - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000; - } - else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) - { - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); - } - else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) - { - heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; - _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); - } - else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) - { - heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); - } - else - { - heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); - } - return heartbeat; - } - protected org.apache.qpid.transport.Connection getQpidConnection() { return _qpidConnection; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 340aca70eb..dfbf7ec60a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -124,10 +124,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), _conn.getProtocolHandler()); + _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - _conn.getProtocolHandler().getProtocolSession().init(); + _conn.getProtocolHandler().getProtocolSession().init(settings); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java b/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java index b1ec7216bc..40264f837e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java +++ b/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java @@ -26,9 +26,20 @@ public class ConnectionTuneParameters private int _channelMax; - private int _heartbeat; + /** Heart-beating interval in seconds, null if not set, use 0 to disable */ + private Integer _heartbeat; - private long _txnLimit; + private float _heartbeatTimeoutFactor; + + public float getHeartbeatTimeoutFactor() + { + return _heartbeatTimeoutFactor; + } + + public void setHeartbeatTimeoutFactor(float heartbeatTimeoutFactor) + { + _heartbeatTimeoutFactor = heartbeatTimeoutFactor; + } public long getFrameMax() { @@ -50,23 +61,13 @@ public class ConnectionTuneParameters _channelMax = channelMax; } - public int getHeartbeat() + public Integer getHeartbeat() { return _heartbeat; } - public void setHeartbeat(int hearbeat) + public void setHeartbeat(Integer hearbeat) { _heartbeat = hearbeat; } - - public long getTxnLimit() - { - return _txnLimit; - } - - public void setTxnLimit(long txnLimit) - { - _txnLimit = txnLimit; - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index f77718672e..617380e149 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -52,20 +52,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con _logger.debug("ConnectionTune frame received"); final MethodRegistry methodRegistry = session.getMethodRegistry(); - ConnectionTuneParameters params = session.getConnectionTuneParameters(); - if (params == null) - { - params = new ConnectionTuneParameters(); - } - + int maxChannelNumber = frame.getChannelMax(); //0 implies no limit, except that forced by protocol limitations (0xFFFF) params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber); - params.setFrameMax(frame.getFrameMax()); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); - session.setConnectionTuneParameters(params); + + //if the heart beat delay hasn't been configured, we use the broker-supplied value + if (params.getHeartbeat() == null) + { + params.setHeartbeat(frame.getHeartbeat()); + } + + session.tuneConnection(params); session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 816caac824..37e6904378 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -902,13 +902,13 @@ public class AMQProtocolHandler implements ProtocolEngine return _sender; } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) + void initHeartbeats(int delay, float timeoutFactor) { if (delay > 0) { _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + int readerIdle = (int)(delay * timeoutFactor); + _network.setMaxReadIdle(readerIdle); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 67bd8de846..4027ccb725 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,6 +43,7 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -63,18 +64,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class); - public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; - //Usable channels are numbered 1 to <ChannelMax> public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 1; - protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; - - protected static final String AMQ_CONNECTION = "AMQConnection"; - - protected static final String SASL_CLIENT = "SASLClient"; - private final AMQProtocolHandler _protocolHandler; private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); @@ -120,13 +113,38 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _connection = connection; } - public void init() + public void init(ConnectionSettings settings) { // start the process of setting up the connection. This is the first place that // data is written to the server. + initialiseTuneParameters(settings); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + private void initialiseTuneParameters(ConnectionSettings settings) + { + _connectionTuneParameters = new ConnectionTuneParameters(); + _connectionTuneParameters.setHeartbeat(settings.getHeartbeatInterval08()); + _connectionTuneParameters.setHeartbeatTimeoutFactor(settings.getHeartbeatTimeoutFactor()); + } + + public void tuneConnection(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + + _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor()); + } + public String getClientID() { try @@ -170,24 +188,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _saslClient = client; } - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - _protocolHandler.initHeartbeats((int) params.getHeartbeat()); - } - /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA - * dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. * * @param message * @@ -409,7 +411,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { if (_logger.isDebugEnabled()) { - _logger.debug("Setting ProtocolVersion to :" + pv); + _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java deleted file mode 100644 index 35ea44a331..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class HeartbeatConfig -{ - private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class); - static final HeartbeatConfig CONFIG = new HeartbeatConfig(); - - /** - * The factor used to get the timeout from the delay between heartbeats. - */ - private float timeoutFactor = 2; - - HeartbeatConfig() - { - String property = System.getProperty("amqj.heartbeat.timeoutFactor"); - if (property != null) - { - try - { - timeoutFactor = Float.parseFloat(property); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property); - } - } - } - - float getTimeoutFactor() - { - return timeoutFactor; - } - - int getTimeout(int writeDelay) - { - return (int) (timeoutFactor * writeDelay); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 4a7fca1efa..b039d8b005 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,8 +34,9 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; - public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated public static final String OPTIONS_HEARTBEAT = "heartbeat"; + @Deprecated + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; public static final String OPTIONS_SASL_MECHS = "sasl_mechs"; public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption"; public static final String OPTIONS_SSL = "ssl"; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java index 1e9e5b00a5..ad9d3d3516 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java @@ -164,4 +164,30 @@ public class BrokerDetailsTest extends TestCase assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL))); } + + public void testHeartbeatDefaultsToNull() throws Exception + { + String brokerURL = "tcp://localhost:5672"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertNull("unexpected default value for " + BrokerDetails.OPTIONS_HEARTBEAT, broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + + public void testOverriddingHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?heartbeat='60'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } + + @SuppressWarnings("deprecation") + public void testLegacyHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?idle_timeout='60000'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60000, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index 517fd1829f..b73f08f824 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.configuration; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.Properties; public interface Accessor { @@ -34,6 +28,7 @@ public interface Accessor public Integer getInt(String name); public Long getLong(String name); public String getString(String name); + public Float getFloat(String name); static class SystemPropertyAccessor implements Accessor { @@ -56,6 +51,11 @@ public interface Accessor { return System.getProperty(name); } + + public Float getFloat(String name) + { + return System.getProperty(name) == null ? null : Float.parseFloat(System.getProperty(name)); + } } static class MapAccessor implements Accessor @@ -147,132 +147,24 @@ public interface Accessor return null; } } - } - - static class PropertyFileAccessor extends MapAccessor - { - public PropertyFileAccessor(String fileName) throws FileNotFoundException, IOException - { - super(null); - Properties props = new Properties(); - FileInputStream inStream = new FileInputStream(fileName); - try - { - props.load(inStream); - } - finally - { - inStream.close(); - } - setSource(props); - } - - - } - - static class CombinedAccessor implements Accessor - { - private List<Accessor> accessors; - - public CombinedAccessor(Accessor...accessors) - { - this.accessors = Arrays.asList(accessors); - } - - public Boolean getBoolean(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getBoolean(name); - } - } - return null; - } - public Integer getInt(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) - { - return accessor.getInt(name); - } - } - return null; - } - - public Long getLong(String name) + public Float getFloat(String name) { - for (Accessor accessor: accessors) + if (source != null && source.containsKey(name)) { - if (accessor.getBoolean(name) != null) + if (source.get(name) instanceof Float) { - return accessor.getLong(name); + return (Float)source.get(name); } - } - return null; - } - - public String getString(String name) - { - for (Accessor accessor: accessors) - { - if (accessor.getBoolean(name) != null) + else { - return accessor.getString(name); + return Float.parseFloat((String)source.get(name)); } } - return null; - } - } - - static class ValidationAccessor implements Accessor - { - private List<Validator> validators; - private Accessor delegate; - - public ValidationAccessor(Accessor delegate,Validator...validators) - { - this.validators = Arrays.asList(validators); - this.delegate = delegate; - } - - public Boolean getBoolean(String name) - { - // there is nothing to validate in a boolean - return delegate.getBoolean(name); - } - - public Integer getInt(String name) - { - Integer v = delegate.getInt(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public Long getLong(String name) - { - Long v = delegate.getLong(name); - for (Validator validator: validators) - { - validator.validate(v); - } - return v; - } - - public String getString(String name) - { - String v = delegate.getString(name); - for (Validator validator: validators) + else { - validator.validate(v); + return null; } - return v; } } } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 51dad51bf9..b43b9d450b 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -63,20 +63,47 @@ public class ClientProperties public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; /** - * This value will be used in the following settings - * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) - * If this values is between the max and min values specified for heartbeat - * by the broker in TuneOK it will be used as the heartbeat interval. - * If not a warning will be printed and the max value specified for - * heartbeat in TuneOK will be used - * - * The default idle timeout is set to 120 secs + * Frequency of heartbeat messages (in milliseconds) + * @see #QPID_HEARTBEAT_INTERVAL */ + @Deprecated public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; - public static final long DEFAULT_IDLE_TIMEOUT = 120000; - public static final String HEARTBEAT = "qpid.heartbeat"; - public static final int HEARTBEAT_DEFAULT = 120; + /** + * Frequency of heartbeat messages (in seconds) + * @see #QPID_HEARTBEAT_INTERVAL + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_DELAY = "amqj.heartbeat.delay"; + + /** + * Frequency of heartbeat messages (in seconds) + */ + public static final String QPID_HEARTBEAT_INTERVAL = "qpid.heartbeat"; + + /** + * Default heartbeat interval (used by 0-10 protocol). + */ + public static final int QPID_HEARTBEAT_INTERVAL_010_DEFAULT = 120; + + /** + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR + */ + @Deprecated + public static final String AMQJ_HEARTBEAT_TIMEOUT_FACTOR = "amqj.heartbeat.timeoutFactor"; + + /** + * The factor applied to {@link #QPID_HEARTBEAT_INTERVAL} that determines the maximum + * length of time that may elapse before the peer is deemed to have failed. + * + * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT + */ + public static final String QPID_HEARTBEAT_TIMEOUT_FACTOR = "qpid.heartbeat_timeout_factor"; + + /** + * Default heartbeat timeout factor. + */ + public static final float QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT = 2.0f; /** * This value will be used to determine the default destination syntax type. @@ -215,6 +242,8 @@ public class ClientProperties */ public static final String SET_EXPIRATION_AS_TTL = "qpid.set_expiration_as_ttl"; + + private ClientProperties() { //No instances diff --git a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java index e0989495bb..3ed32a604a 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java @@ -102,6 +102,11 @@ public abstract class QpidProperty<T> return new QpidStringProperty(accessor,defaultValue, names); } + public static QpidProperty<Float> floatProperty(Float defaultValue, String... names) + { + return new QpidFloatProperty(defaultValue, names); + } + protected Accessor getAccessor() { return accessor; @@ -183,4 +188,23 @@ public abstract class QpidProperty<T> } } + static class QpidFloatProperty extends QpidProperty<Float> + { + QpidFloatProperty(Float defValue, String... names) + { + super(defValue, names); + } + + QpidFloatProperty(Accessor accessor,Float defValue, String... names) + { + super(accessor,defValue, names); + } + + @Override + protected Float getByName(String name) + { + return getAccessor().getFloat(name); + } + } + }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 185c01d3df..33604b05d9 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -62,6 +62,5 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void setSender(Sender<ByteBuffer> sender); - public void init(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c75adab444..75eb0e19a7 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -133,15 +133,17 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { - int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(), - tune.getHeartbeatMin(), - tune.getHeartbeatMax() - ); + int heartbeatInterval = _connectionSettings.getHeartbeatInterval010(); + float heartbeatTimeoutFactor = _connectionSettings.getHeartbeatTimeoutFactor(); + int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, + tune.getHeartbeatMin(), + tune.getHeartbeatMax()); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), - hb_interval); - // The idle timeout is twice the heartbeat amount (in milisecs) - conn.setIdleTimeout(hb_interval*1000*2); + actualHeartbeatInterval); + + int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); //0 means no implied limit, except available server resources @@ -184,7 +186,7 @@ public class ClientDelegate extends ConnectionDelegate int i = heartbeat; if (i == 0) { - log.info("Idle timeout is 0 sec. Heartbeats are disabled."); + log.info("Heartbeat interval is 0 sec. Heartbeats are disabled."); return 0; // heartbeats are disabled. } else if (i >= min && i <= max) @@ -193,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate } else { - log.info("The broker does not support the configured connection idle timeout of %s sec," + + log.info("The broker does not support the configured connection heartbeat interval of %s sec," + " using the brokers max supported value of %s sec instead.", i,max); return max; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 14dfeb18ec..2ff08fd751 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.transport; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT; import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME; @@ -50,6 +57,7 @@ public class ConnectionSettings { public static final String WILDCARD_ADDRESS = "*"; + private String protocol = "tcp"; private String host = "localhost"; private String vhost; @@ -59,7 +67,9 @@ public class ConnectionSettings private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get(); private int maxChannelCount = 32767; private int maxFrameSize = 65535; - private int heartbeatInterval; + private Integer hearbeatIntervalLegacyMs = QpidProperty.intProperty(null, IDLE_TIMEOUT_PROP_NAME).get(); + private Integer heartbeatInterval = QpidProperty.intProperty(null, QPID_HEARTBEAT_INTERVAL, AMQJ_HEARTBEAT_DELAY).get(); + private float heartbeatTimeoutFactor = QpidProperty.floatProperty(QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT, QPID_HEARTBEAT_TIMEOUT_FACTOR, AMQJ_HEARTBEAT_TIMEOUT_FACTOR).get(); private int connectTimeout = 30000; private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get(); private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();; @@ -95,9 +105,45 @@ public class ConnectionSettings this.tcpNodelay = tcpNodelay; } - public int getHeartbeatInterval() - { - return heartbeatInterval; + /** + * Gets the heartbeat interval (seconds) for 0-8/9/9-1 protocols. + * 0 means heartbeating is disabled. + * null means use the broker-supplied value. + */ + public Integer getHeartbeatInterval08() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return null; + } + } + + /** + * Gets the heartbeat interval (seconds) for the 0-10 protocol. + * 0 means heartbeating is disabled. + */ + public int getHeartbeatInterval010() + { + if (heartbeatInterval != null) + { + return heartbeatInterval; + } + else if (hearbeatIntervalLegacyMs != null) + { + return hearbeatIntervalLegacyMs / 1000; + } + else + { + return QPID_HEARTBEAT_INTERVAL_010_DEFAULT; + } } public void setHeartbeatInterval(int heartbeatInterval) @@ -105,6 +151,11 @@ public class ConnectionSettings this.heartbeatInterval = heartbeatInterval; } + public float getHeartbeatTimeoutFactor() + { + return this.heartbeatTimeoutFactor; + } + public String getProtocol() { return protocol; @@ -374,4 +425,5 @@ public class ConnectionSettings { this.writeBufferSize = writeBufferSize; } + } diff --git a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java index 2a8c177f64..335270264c 100644 --- a/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java +++ b/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java @@ -145,6 +145,25 @@ public class QpidPropertyTest extends QpidTestCase assertEquals(expectedValue, propertyValue); } + public void testFloatValueReadFromSystemProperty() throws Exception + { + float expectedValue = 1.5f; + setTestSystemProperty(_systemPropertyName, Float.valueOf(expectedValue).toString()); + assertSystemPropertiesSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(1.5f, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + + public void testFloatValueIsDefaultWhenOneSystemPropertyIsNotSet() throws Exception + { + float expectedValue = 1.5f; + assertSystemPropertiesNotSet(_systemPropertyName); + + float propertyValue = QpidProperty.floatProperty(expectedValue, _systemPropertyName).get(); + assertEquals(expectedValue, propertyValue, 0.1); + } + private void assertSystemPropertiesSet(String... systemPropertyNames) { for (String systemPropertyName : systemPropertyNames) diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java index fc4f5374f0..d031842f9d 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java @@ -128,11 +128,48 @@ public class ConnectionSettingsTest extends QpidTestCase } @SuppressWarnings("deprecation") - public void testtestReceiveBufferSizeOverriddenLegacyOverridden() + public void testReceiveBufferSizeOverriddenLegacyOverridden() { systemPropertyOverrideForSocketBufferSize(ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME, 1024, true); } + public void testHeartbeatingDefaults() + { + assertNull(_conConnectionSettings.getHeartbeatInterval08()); + assertEquals(ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT,_conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.0, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + public void testHeartbeatingOverridden() + { + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_INTERVAL, "60"); + resetSystemProperty(ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR, "2.5"); + + assertEquals(Integer.valueOf(60), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(60, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(2.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingAmqjLegacyOption() + { + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_DELAY, "30"); + resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR, "1.5"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + assertEquals(1.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatingOverriddenUsingOlderLegacyOption() + { + resetSystemProperty(ClientProperties.IDLE_TIMEOUT_PROP_NAME, "30000"); + + assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08()); + assertEquals(30, _conConnectionSettings.getHeartbeatInterval010()); + } + private void systemPropertyOverrideForTcpDelay(String propertyName, boolean value) { resetSystemProperty(propertyName, String.valueOf(value)); diff --git a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java index 0e01bda8d0..143565c648 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java @@ -18,49 +18,86 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; + import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class HeartbeatTest extends QpidBrokerTestCase { - public void testHeartbeats() throws Exception + private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''"; + private TestListener _listener = new TestListener(); + + @Override + public void setUp() throws Exception + { + if (getName().equals("testHeartbeatsEnabledBrokerSide")) + { + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_HEART_BEAT_DELAY, "1"); + } + super.setUp(); + } + + public void testHeartbeatsEnabledUsingUrl() throws Exception + { + final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, DEFAULT_PORT, 1); + AMQConnection conn = (AMQConnection) getConnection(new AMQConnectionURL(url)); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + public void testHeartbeatsEnabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "1"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2); - assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2); + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); conn.close(); } - public void testNoHeartbeats() throws Exception + public void testHeartbeatsDisabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "0"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "0"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived); - assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent); + assertEquals("Heartbeats unexpectedly received", 0, _listener._heartbeatsReceived); + assertEquals("Heartbeats unexpectedly sent ", 0, _listener._heartbeatsSent); conn.close(); } - public void testReadOnlyConnectionHeartbeats() throws Exception + /** + * This test carefully arranges message flow so that bytes flow only from producer to broker + * on the producer side and broker to consumer on the consumer side, deliberately leaving the + * reverse path quiet so heartbeats will flow. + */ + public void testUnidirectionalHeartbeating() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay","1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL,"1"); AMQConnection receiveConn = (AMQConnection) getConnection(); AMQConnection sendConn = (AMQConnection) getConnection(); Destination destination = getTestQueue(); @@ -83,10 +120,9 @@ public class HeartbeatTest extends QpidBrokerTestCase producer.send(senderSession.createTextMessage("Msg " + i)); Thread.sleep(500); assertNotNull("Expected to received message", consumer.receive(500)); + // Consumer does not ack the message in order to generate no bytes from consumer back to Broker } - - assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2); assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent); @@ -97,6 +133,54 @@ public class HeartbeatTest extends QpidBrokerTestCase sendConn.close(); } + public void testHeartbeatsEnabledBrokerSide() throws Exception + { + + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception + { + setTestSystemProperty(AMQJ_HEARTBEAT_DELAY, "1"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception + { + setTestSystemProperty(IDLE_TIMEOUT_PROP_NAME, "1000"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + private class TestListener implements HeartbeatListener { int _heartbeatsReceived; diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index 8ea592daf9..6ce0936c08 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -186,7 +186,8 @@ org.apache.qpid.client.ssl.SSLTest#testCreateSSLandTCPonSamePort // QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats +org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating +org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index 02c2db1134..093821647d 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -68,7 +68,8 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener // QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats +org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating +org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide // Java 0-10 client does not support re-binding the queue to the same exchange org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange |