diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 152 |
1 files changed, 109 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index b314453e31..816caac824 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.util.BytesDataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import java.io.IOException; @@ -177,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private long _lastReadTime = System.currentTimeMillis(); + private long _lastWriteTime = System.currentTimeMillis(); + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -210,48 +215,67 @@ public class AMQProtocolHandler implements ProtocolEngine } else { - _logger.debug("Session closed called with failover state currently " + _failoverState); - - // reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) + // Use local variable to keep flag whether fail-over allowed or not, + // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, + // otherwise it might deadlock with failover mutex + boolean failoverNotAllowed = false; + synchronized (this) { - _logger.debug("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.debug("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.debug("Failover not allowed by policy."); // or already in progress? - if (_logger.isDebugEnabled()) { - _logger.debug(_connection.getFailoverPolicy().toString()); + _logger.debug("Session closed called with failover state " + _failoverState); } - if (_failoverState != FailoverState.IN_PROGRESS) + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + if (_failoverState == FailoverState.NOT_STARTED) { - _logger.debug("sessionClose() not allowed to failover"); - _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", - _stateManager.getLastException())); + // close the sender + try + { + _sender.close(); + } + catch (Exception e) + { + _logger.warn("Exception occured on closing the sender", e); + } + if (_connection.failoverAllowed()) + { + _failoverState = FailoverState.IN_PROGRESS; + + _logger.debug("FAILOVER STARTING"); + startFailoverThread(); + } + else if (_connection.isConnected()) + { + failoverNotAllowed = true; + if (_logger.isDebugEnabled()) + { + _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy()); + } + } + else + { + _logger.debug("We are in process of establishing the initial connection"); + } } else { - _logger.debug("sessionClose() failover in progress"); + _logger.debug("Not starting the failover thread as state currently " + _failoverState); } } + + if (failoverNotAllowed) + { + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); + } } - _logger.debug("Protocol Session [" + this + "] closed"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Protocol Session [" + this + "] closed"); + } } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -280,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); // failover: - HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); _network.close(); } @@ -289,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); writeFrame(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); + _heartbeatListener.heartbeatSent(); } /** @@ -297,14 +320,29 @@ public class AMQProtocolHandler implements ProtocolEngine */ public void exception(Throwable cause) { - if (_failoverState == FailoverState.NOT_STARTED) + boolean causeIsAConnectionProblem = + cause instanceof AMQConnectionClosedException || + cause instanceof IOException || + cause instanceof TransportException; + + if (causeIsAConnectionProblem) { - if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) + //ensure the IoSender and IoReceiver are closed + try { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attempt failover _network.close(); - closed(); + } + catch (Exception e) + { + //ignore + } + } + FailoverState state = getFailoverState(); + if (state == FailoverState.NOT_STARTED) + { + if (causeIsAConnectionProblem) + { + _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause); } else { @@ -319,7 +357,7 @@ public class AMQProtocolHandler implements ProtocolEngine } // we reach this point if failover was attempted and failed therefore we need to let the calling app // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) + else if (state == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); @@ -329,6 +367,10 @@ public class AMQProtocolHandler implements ProtocolEngine propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } + else + { + _logger.warn("Exception caught by protocol handler: " + cause, cause); + } } /** @@ -403,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { _readBytes += msg.remaining(); + _lastReadTime = System.currentTimeMillis(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -431,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine final AMQBody bodyFrame = frame.getBodyFrame(); - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_readBytes); @@ -521,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); + _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); if(flush) @@ -792,14 +834,14 @@ public class AMQProtocolHandler implements ProtocolEngine return _protocolSession; } - FailoverState getFailoverState() + synchronized FailoverState getFailoverState() { return _failoverState; } - public void setFailoverState(FailoverState failoverState) + public synchronized void setFailoverState(FailoverState failoverState) { - _failoverState = failoverState; + _failoverState= failoverState; } public byte getProtocolMajorVersion() @@ -843,6 +885,23 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + + protected Sender<ByteBuffer> getSender() + { + return _sender; + } + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { @@ -850,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _network.setMaxWriteIdle(delay); _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } @@ -865,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine } + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } + public void heartbeatBodyReceived() + { + _heartbeatListener.heartbeatReceived(); + } } |