diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-11-28 21:21:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-11-28 21:21:05 +0000 |
commit | f58f4189e28b940bac1134325796f0c5c2751068 (patch) | |
tree | 1b0e6e73dacbdb51e13c24fc5b73b6cfbf4a6d39 | |
parent | 282ceb6824629a16efdf3d5e1c447b05be04ca09 (diff) | |
download | qpid-python-f58f4189e28b940bac1134325796f0c5c2751068.tar.gz |
QPID-2796 : Restore heartbeating functionality to the Java Broker, and the 0-8,0-9,0-9-1 Java Client
merge from trunk r1413376,1413539,1413549,1413567
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414927 13f79535-47bb-0310-9956-ffa450edef68
33 files changed, 994 insertions, 180 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 976d7fd28a..72c21d357e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -150,6 +151,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private boolean _blocking; private final Lock _receivedLock; + private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { @@ -541,7 +544,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); - _lastIoTime = System.currentTimeMillis(); + final long time = System.currentTimeMillis(); + _lastIoTime = time; + _lastWriteTime.set(time); + if(!_deferFlush) { _sender.flush(); @@ -1058,12 +1064,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // Nothing + // TODO - enforce disconnect on lack of inbound data } public synchronized void writerIdle() { - _sender.send(asByteBuffer(HeartbeatBody.FRAME)); + writeFrame(HeartbeatBody.FRAME); } public void exception(Throwable throwable) @@ -1461,4 +1467,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _receivedLock; } + + @Override + public long getLastReadTime() + { + return _lastReceivedTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime.get(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index c62f1b56f0..c8126b3677 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -217,6 +217,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _delegate.getLastReadTime(); + } + + @Override + public long getLastWriteTime() + { + return _delegate.getLastWriteTime(); + } + private static interface DelegateCreator { @@ -409,6 +421,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } + public long getConnectionId() { return _id; @@ -566,5 +590,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { } + + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index d7d26cc772..7d263b517c 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -44,6 +44,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private ServerConnection _connection; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); + private long _lastReadTime; + private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network, @@ -71,13 +73,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { _network = network; - _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE)); + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); _connection.setPeerPrincipal(_network.getPeerPrincipal()); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false)); _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false)); } + private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + { + return new Sender<ByteBuffer>() + { + @Override + public void setIdleTimeout(int i) + { + sender.setIdleTimeout(i); + + } + + @Override + public void send(ByteBuffer msg) + { + _lastWriteTime = System.currentTimeMillis(); + sender.send(msg); + + } + + @Override + public void flush() + { + sender.flush(); + + } + + @Override + public void close() + { + sender.close(); + + } + }; + } + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -90,6 +140,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { + _lastReadTime = System.currentTimeMillis(); super.received(buf); _connection.receivedComplete(); } @@ -106,7 +157,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void writerIdle() { - //Todo + _connection.doHeartbeat(); } public void readerIdle() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index b5110a21d8..715a512b47 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa //private NetworkConnection _networkDriver; private long _readBytes; private long _writtenBytes; + private long _lastReadTime; + private long _lastWriteTime; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _conn; @@ -182,6 +184,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa public synchronized void received(ByteBuffer msg) { + _lastReadTime = System.currentTimeMillis(); if(RAW_LOGGER.isLoggable(Level.FINE)) { ByteBuffer dup = msg.duplicate(); @@ -324,6 +327,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa synchronized(_sendLock) { + _lastWriteTime = System.currentTimeMillis(); if(FRAME_LOGGER.isLoggable(Level.FINE)) { FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); @@ -378,4 +382,13 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa return _connectionId; } + public long getLastReadTime() + { + return _lastReadTime; + } + + public long getLastWriteTime() + { + return _lastWriteTime; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index 71d6df27e0..634c5e6255 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { private long _readBytes; private long _writtenBytes; + + private long _lastReadTime; + private long _lastWriteTime; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _conn; @@ -222,6 +225,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut public synchronized void received(ByteBuffer msg) { + _lastReadTime = System.currentTimeMillis(); if(RAW_LOGGER.isLoggable(Level.FINE)) { ByteBuffer dup = msg.duplicate(); @@ -364,7 +368,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut synchronized(_sendLock) { - + _lastWriteTime = System.currentTimeMillis(); if(FRAME_LOGGER.isLoggable(Level.FINE)) { FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); @@ -425,4 +429,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut return _connectionId; } + public long getLastReadTime() + { + return _lastReadTime; + } + + public long getLastWriteTime() + { + return _lastWriteTime; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index ce3ede2dba..58de6a0cdf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -48,6 +48,7 @@ import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.network.NetworkConnection; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; @@ -68,6 +69,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Principal _peerPrincipal; + private NetworkConnection _networkConnection; public ServerConnection(final long connectionId) { @@ -490,4 +492,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { super.setLocalAddress(localAddress); } + + public void setNetworkConnection(NetworkConnection network) + { + _networkConnection = network; + } + + public NetworkConnection getNetworkConnection() + { + return _networkConnection; + } + + public void doHeartbeat() + { + super.doHeartBeat(); + + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 70f5afe5ac..f48121f9f0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,14 +228,18 @@ public class ServerConnectionDelegate extends ServerDelegate return; } - setConnectionTuneOkChannelMax(sconn, okChannelMax); - } + if(ok.hasHeartbeat()) + { + final int heartbeat = ok.getHeartbeat(); + if(heartbeat > 0) + { + final NetworkConnection networkConnection = sconn.getNetworkConnection(); + networkConnection.setMaxReadIdle(2 * heartbeat); + networkConnection.setMaxWriteIdle(heartbeat); + } + } - @Override - protected int getHeartbeatMax() - { - //TODO: implement broker support for actually sending heartbeats - return 0; + setConnectionTuneOkChannelMax(sconn, okChannelMax); } @Override diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6f2631ac05..a0e659c359 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1560,4 +1560,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + localAddress + " to " + remoteAddress); } } + + void setHeartbeatListener(HeartbeatListener listener) + { + _delegate.setHeartbeatListener(listener); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index b6f25a2cef..a8fdaeb65c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -78,4 +78,6 @@ public interface AMQConnectionDelegate * @return true if the feature is supported by the server */ boolean isSupportedServerFeature(final String featureName); + + void setHeartbeatListener(HeartbeatListener listener); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 5dd6e55e64..69e79d42a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return featureSupported; } + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener); + } + private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail) { ConnectionSettings conSettings = brokerDetail.buildConnectionSettings(); @@ -484,10 +490,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); } - else + else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); } + else + { + heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); + } return heartbeat; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 740a81b939..67d7c2a78c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -122,7 +122,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + _conn.getProtocolHandler()); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); @@ -376,4 +378,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // we just hardcode JMS selectors as supported. return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); } + + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + _conn.getProtocolHandler().setHeartbeatListener(listener); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java new file mode 100644 index 0000000000..32a7cb0b73 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +public interface HeartbeatListener +{ + void heartbeatReceived(); + + void heartbeatSent(); + + static final HeartbeatListener DEFAULT = new HeartbeatListener() + { + public void heartbeatReceived() + { + } + + public void heartbeatSent() + { + } + }; +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index af89000c5c..816caac824 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.util.BytesDataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private long _lastReadTime = System.currentTimeMillis(); + private long _lastWriteTime = System.currentTimeMillis(); + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -300,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); // failover: - HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); _network.close(); } @@ -309,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); writeFrame(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); + _heartbeatListener.heartbeatSent(); } /** @@ -442,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { _readBytes += msg.remaining(); + _lastReadTime = System.currentTimeMillis(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -470,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine final AMQBody bodyFrame = frame.getBodyFrame(); - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_readBytes); @@ -560,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); + _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); if(flush) @@ -882,6 +885,18 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + protected Sender<ByteBuffer> getSender() { return _sender; @@ -894,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _network.setMaxWriteIdle(delay); _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } @@ -909,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine } + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } + public void heartbeatBodyReceived() + { + _heartbeatListener.heartbeatReceived(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index cf521c8892..aed10cf15f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -267,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException { - + _protocolHandler.heartbeatBodyReceived(); } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java deleted file mode 100644 index d387a8ba93..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -class HeartbeatDiagnostics -{ - private static final Diagnostics _impl = init(); - - private HeartbeatDiagnostics() - { - } - - private static Diagnostics init() - { - return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off(); - } - - static void sent() - { - _impl.sent(); - } - - static void timeout() - { - _impl.timeout(); - } - - static void received(boolean heartbeat) - { - _impl.received(heartbeat); - } - - static void init(int delay, int timeout) - { - _impl.init(delay, timeout); - } - - private static interface Diagnostics - { - void sent(); - void timeout(); - void received(boolean heartbeat); - void init(int delay, int timeout); - } - - private static class On implements Diagnostics - { - private final String[] messages = new String[50]; - private int i; - - private void save(String msg) - { - messages[i++] = msg; - if(i >= messages.length){ - i = 0;//i.e. a circular buffer - } - } - - public void sent() - { - save(System.currentTimeMillis() + ": sent heartbeat"); - } - - public void timeout() - { - for(int i = 0; i < messages.length; i++) - { - if(messages[i] != null) - { - System.out.println(messages[i]); - } - } - System.out.println(System.currentTimeMillis() + ": timed out"); - } - - public void received(boolean heartbeat) - { - save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data")); - } - - public void init(int delay, int timeout) - { - System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout); - } - } - - private static class Off implements Diagnostics - { - public void sent() - { - - } - public void timeout() - { - - } - public void received(boolean heartbeat) - { - - } - - public void init(int delay, int timeout) - { - - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index 3c9a6e1500..4789dd0ed7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.HeartbeatListener; +import org.apache.qpid.transport.ConnectionHeartbeat; import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSException; import org.ietf.jgss.GSSManager; @@ -70,6 +72,7 @@ public class ClientConnectionDelegate extends ClientDelegate } private final ConnectionURL _connectionURL; + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * @param settings @@ -165,4 +168,19 @@ public class ClientConnectionDelegate extends ClientDelegate return null; } + + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + { + // ClientDelegate simply responds to heartbeats with heartbeats + _heartbeatListener.heartbeatReceived(); + super.connectionHeartbeat(conn, hearbeat); + _heartbeatListener.heartbeatSent(); + } + + + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 7ca588946b..6774d0a45a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -23,6 +23,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -31,7 +32,7 @@ import java.nio.ByteBuffer; * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> +public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -56,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> void readerIdle(); - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index f95a0d215b..5ae2f1ceb2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -21,12 +21,7 @@ package org.apache.qpid.transport; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.*; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; @@ -73,6 +68,9 @@ public class Connection extends ConnectionInvoker //Usable channels are numbered 0 to <ChannelMax> - 1 public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; + private long _lastSendTime; + private long _lastReadTime; + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -231,7 +229,8 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver); + NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); + setRemoteAddress(network.getRemoteAddress()); setLocalAddress(network.getLocalAddress()); @@ -368,6 +367,7 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { + _lastReadTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("RECV: [%s] %s", this, event); @@ -377,6 +377,7 @@ public class Connection extends ConnectionInvoker public void send(ProtocolEvent event) { + _lastSendTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("SEND: [%s] %s", this, event); @@ -745,4 +746,38 @@ public class Connection extends ConnectionInvoker sessionDetached.setCode(sessionDetachCode); invoke(sessionDetached); } + + + protected void doHeartBeat() + { + connectionHeartbeat(); + } + + private class ConnectionActivity implements TransportActivity + { + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastSendTime; + } + + @Override + public void writerIdle() + { + connectionHeartbeat(); + } + + @Override + public void readerIdle() + { + // TODO + + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index 4d4274278f..8437ef1a94 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext; public interface IncomingNetworkTransport extends NetworkTransport { - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext); + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 12c42d6643..050d194c47 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -50,4 +50,8 @@ public interface NetworkConnection void setPeerPrincipal(Principal principal); Principal getPeerPrincipal(); + + int getMaxReadIdle(); + + int getMaxWriteIdle(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 92774f5842..45231aa05d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -29,5 +29,7 @@ public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate); + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity); }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java new file mode 100644 index 0000000000..210b014a57 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface Ticker +{ + int getTimeToNextTick(long currentTime); + + int tick(long currentTime); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java new file mode 100644 index 0000000000..2ee336d9b2 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface TransportActivity +{ + long getLastReadTime(); + + long getLastWriteTime(); + + void writerIdle(); + + void readerIdle(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java new file mode 100644 index 0000000000..54a2a360bb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportActivity; + +class IdleTimeoutTicker implements Ticker +{ + private final TransportActivity _transport; + private final int _defaultTimeout; + private NetworkConnection _connection; + + public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout) + { + _transport = transport; + _defaultTimeout = defaultTimeout; + } + + @Override + public int getTimeToNextTick(long currentTime) + { + long nextTime = -1; + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + + if(maxReadIdle > 0) + { + nextTime = _transport.getLastReadTime() + maxReadIdle; + } + + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + + if(maxWriteIdle > 0) + { + long writeTime = _transport.getLastWriteTime() + maxWriteIdle; + if(nextTime == -1l || writeTime < nextTime) + { + nextTime = writeTime; + } + } + return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime); + } + + @Override + public int tick(long currentTime) + { + // writer Idle + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) + { + _transport.writerIdle(); + } + // reader Idle + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) + { + + _transport.readerIdle(); + } + return getTimeToNextTick(currentTime); + } + + public void setConnection(NetworkConnection connection) + { + _connection = connection; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 2658296c5f..f5c09ac2cc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.security.Principal; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection private final IoSender _ioSender; private final IoReceiver _ioReceiver; private Principal _principal; + private int _maxReadIdle; + private int _maxWriteIdle; public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, - int sendBufferSize, int receiveBufferSize, long timeout) + int sendBufferSize, int receiveBufferSize, long timeout) + { + this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null); + } + + public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; _timeout = timeout; _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioReceiver.setTicker(ticker); _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); @@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection public void setMaxWriteIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxWriteIdle = sec; } public void setMaxReadIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxReadIdle = sec; } @Override @@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection { return _principal; } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 4cb526ef73..3f1cd3519b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.*; + import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport @@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private IoNetworkConnection _connection; private AcceptingThread _acceptor; - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate) + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); int receiveBufferSize = settings.getReadBufferSize(); @@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT); + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); _connection.start(); } catch(Exception e) @@ -128,7 +131,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet return _connection; } - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext) + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) { try { @@ -149,6 +154,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private ProtocolEngineFactory _factory; private SSLContext _sslContext; private ServerSocket _serverSocket; + private int _timeout; private AcceptingThread(NetworkTransportConfiguration config, ProtocolEngineFactory factory, @@ -157,6 +163,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet _config = config; _factory = factory; _sslContext = sslContext; + _timeout = TIMEOUT; InetSocketAddress address = config.getAddress(); @@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(_timeout); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -226,7 +234,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet ProtocolEngine engine = _factory.newProtocolEngine(); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT); + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, + ticker); + ticker.setConnection(connection); if(_sslContext != null) { @@ -293,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } } } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index a36e5fedee..06a43e21c6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLSocket; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; private static final boolean shutdownBroken; + + private Ticker _ticker; static { String osName = System.getProperty("os.name"); @@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable { final int threshold = bufferSize / 2; - // I set the read buffer size simillar to SO_RCVBUF + // I set the read buffer size similar to SO_RCVBUF // Haven't tested with a lower value to see if it's better or worse byte[] buffer = new byte[bufferSize]; try @@ -144,17 +148,64 @@ final class IoReceiver implements Runnable, Closeable InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + long currentTime; + while(read != -1) { - if (read > 0) + try + { + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + try + { + if(!socket.isClosed()) + { + socket.setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) + currentTime = System.currentTimeMillis(); + if(_ticker != null) { - offset = 0; - buffer = new byte[bufferSize]; + final int tick = _ticker.tick(currentTime); + if(!socket.isClosed()) + { + try + { + socket.setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } } } } @@ -195,4 +246,15 @@ final class IoReceiver implements Runnable, Closeable return !brokenClose && !sslSocketClosed; } + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 893f66c5ff..a19c2e7e43 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -83,6 +83,18 @@ public class TestNetworkConnection implements NetworkConnection return null; } + @Override + public int getMaxReadIdle() + { + return 0; + } + + @Override + public int getMaxWriteIdle() + { + return 0; + } + public void setMaxWriteIdle(int idleTime) { diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index b4c0981131..bf9a5843d6 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -127,7 +127,9 @@ public class TransportTest extends QpidTestCase throw new UnsupportedOperationException(); } - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate) + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) { throw new UnsupportedOperationException(); } @@ -147,7 +149,7 @@ public class TransportTest extends QpidTestCase } public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, SSLContext sslContext) + ProtocolEngineFactory factory, SSLContext sslContext) { throw new UnsupportedOperationException(); } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java new file mode 100644 index 0000000000..5cdd7a8597 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java @@ -0,0 +1,257 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import junit.framework.TestCase; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; + +public class IdleTimeoutTickerTest extends TestCase implements TransportActivity, NetworkConnection +{ + private IdleTimeoutTicker _ticker; + private static final int DEFAULT_TIMEOUT = 567890; + private long _lastReadTime; + private long _lastWriteTime; + private long _currentTime; + private int _maxWriteIdle; + private int _maxReadIdle; + private boolean _readerIdle; + private boolean _writerIdle; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _ticker = new IdleTimeoutTicker(this, DEFAULT_TIMEOUT); + _ticker.setConnection(this); + _readerIdle = false; + _writerIdle = false; + _lastReadTime = 0l; + _lastWriteTime = 0l; + _maxReadIdle = 0; + _maxWriteIdle = 0; + } + + public void testNoIdle() throws Exception + { + _maxReadIdle = 4; + _maxWriteIdle = 2; + _lastReadTime = 0; + _lastWriteTime = 1500; + _currentTime = 3000; + // Current time = 3s, + // last read = 0s, max read idle = 4s, should check in 1s + // last write = 1.5s, max write idle = 2s, should check in 0.5s + long nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 500l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + + // Current time = 3.4s, + // last read = 0s, max read idle = 4s, should check in 0.6s + // last write = 3.1s, max write idle = 2s, should check in 1.7s + _lastWriteTime = 3100; + _currentTime = 3400; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 600l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + _maxReadIdle = 0; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 1700l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + _maxWriteIdle = 0; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", DEFAULT_TIMEOUT, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + } + + public void testReaderIdle() throws Exception + { + _maxReadIdle = 4; + _maxWriteIdle = 0; + _lastReadTime = 0; + _lastWriteTime = 2500; + _currentTime = 4000; + // Current time = 4s, + // last read = 0s, max read idle = 4s, reader idle + long nextTime = _ticker.tick(_currentTime); + + assertTrue(_readerIdle); + assertFalse(_writerIdle); + + _readerIdle = false; + + // last write = 2.5s, max write idle = 2s, should check in 0.5s + _maxWriteIdle = 2; + nextTime = _ticker.tick(_currentTime); + assertTrue(_readerIdle); + assertFalse(_writerIdle); + + _readerIdle = false; + // last write = 1.5s, max write idle = 2s, should check in 0.5s + + _lastWriteTime = 1500; + nextTime = _ticker.tick(_currentTime); + + assertTrue(_readerIdle); + assertTrue(_writerIdle); + + } + + public void testWriterIdle() throws Exception + { + _maxReadIdle = 0; + _maxWriteIdle = 2; + _lastReadTime = 0; + _lastWriteTime = 1500; + _currentTime = 4000; + // Current time = 4s, + // last write = 1.5s, max write idle = 2s, writer idle + long nextTime = _ticker.tick(_currentTime); + + assertTrue(_writerIdle); + assertFalse(_readerIdle); + assertEquals(2000l,nextTime); + + _writerIdle = false; + _lastWriteTime = 1500; + _maxReadIdle = 5; + + nextTime = _ticker.tick(_currentTime); + + assertTrue(_writerIdle); + assertFalse(_readerIdle); + assertEquals(1000l,nextTime); + + } + + //------------------------------------------------------------------------- + // Implement TransportActivity methods + //------------------------------------------------------------------------- + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + + @Override + public void writerIdle() + { + _writerIdle = true; + _lastWriteTime = _currentTime; + } + + @Override + public void readerIdle() + { + _readerIdle = true; + } + + //------------------------------------------------------------------------- + // Implement NetworkConnection methods + // Only actually use those relating to idle timeouts + //------------------------------------------------------------------------- + + @Override + public Sender<ByteBuffer> getSender() + { + return null; + } + + @Override + public void start() + { + } + + @Override + public void close() + { + } + + @Override + public SocketAddress getRemoteAddress() + { + return null; + } + + @Override + public SocketAddress getLocalAddress() + { + return null; + } + + @Override + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + @Override + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public void setPeerPrincipal(Principal principal) + { + } + + @Override + public Principal getPeerPrincipal() + { + return null; + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java new file mode 100644 index 0000000000..0e01bda8d0 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class HeartbeatTest extends QpidBrokerTestCase +{ + public void testHeartbeats() throws Exception + { + setTestSystemProperty("amqj.heartbeat.delay", "1"); + AMQConnection conn = (AMQConnection) getConnection(); + TestListener listener = new TestListener(); + conn.setHeartbeatListener(listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2); + + conn.close(); + } + + public void testNoHeartbeats() throws Exception + { + setTestSystemProperty("amqj.heartbeat.delay", "0"); + AMQConnection conn = (AMQConnection) getConnection(); + TestListener listener = new TestListener(); + conn.setHeartbeatListener(listener); + conn.start(); + + Thread.sleep(2500); + + assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived); + assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent); + + conn.close(); + } + + public void testReadOnlyConnectionHeartbeats() throws Exception + { + setTestSystemProperty("amqj.heartbeat.delay","1"); + AMQConnection receiveConn = (AMQConnection) getConnection(); + AMQConnection sendConn = (AMQConnection) getConnection(); + Destination destination = getTestQueue(); + TestListener receiveListener = new TestListener(); + TestListener sendListener = new TestListener(); + + + Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageConsumer consumer = receiveSession.createConsumer(destination); + MessageProducer producer = senderSession.createProducer(destination); + + receiveConn.setHeartbeatListener(receiveListener); + sendConn.setHeartbeatListener(sendListener); + receiveConn.start(); + + for(int i = 0; i < 5; i++) + { + producer.send(senderSession.createTextMessage("Msg " + i)); + Thread.sleep(500); + assertNotNull("Expected to received message", consumer.receive(500)); + } + + + + assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2); + assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent); + + assertTrue("Too few heartbeats received at the sender "+sendListener._heartbeatsReceived+" (expected at least 2)", sendListener._heartbeatsReceived>=2); + assertEquals("Unexpected received at the receiver: ",0,receiveListener._heartbeatsReceived); + + receiveConn.close(); + sendConn.close(); + } + + private class TestListener implements HeartbeatListener + { + int _heartbeatsReceived; + int _heartbeatsSent; + @Override + public void heartbeatReceived() + { + _heartbeatsReceived++; + } + + @Override + public void heartbeatSent() + { + _heartbeatsSent++; + } + } +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index fa3a2bc262..bfcc06a1bc 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -181,3 +181,7 @@ org.apache.qpid.client.failover.MultipleBrokersFailoverTest#* // Uses Java broker specific configuration org.apache.qpid.client.ssl.SSLTest#testClientCertMissingWhilstWanting + +// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based +org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats + diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index ca2383a8f3..3f12076dbe 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -64,3 +64,6 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro // QPID-3604: Immediate Prefetch no longer supported by 0-10 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener + +// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based +org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats |