summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java152
1 files changed, 109 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index b314453e31..816caac824 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.IOException;
@@ -177,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -210,48 +215,67 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
- _logger.debug("Session closed called with failover state currently " + _failoverState);
-
- // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+ // Use local variable to keep flag whether fail-over allowed or not,
+ // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+ // otherwise it might deadlock with failover mutex
+ boolean failoverNotAllowed = false;
+ synchronized (this)
{
- _logger.debug("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.debug("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _logger.debug("Failover not allowed by policy."); // or already in progress?
-
if (_logger.isDebugEnabled())
{
- _logger.debug(_connection.getFailoverPolicy().toString());
+ _logger.debug("Session closed called with failover state " + _failoverState);
}
- if (_failoverState != FailoverState.IN_PROGRESS)
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+ if (_failoverState == FailoverState.NOT_STARTED)
{
- _logger.debug("sessionClose() not allowed to failover");
- _connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
- _stateManager.getLastException()));
+ // close the sender
+ try
+ {
+ _sender.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occured on closing the sender", e);
+ }
+ if (_connection.failoverAllowed())
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+
+ _logger.debug("FAILOVER STARTING");
+ startFailoverThread();
+ }
+ else if (_connection.isConnected())
+ {
+ failoverNotAllowed = true;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+ }
+ }
+ else
+ {
+ _logger.debug("We are in process of establishing the initial connection");
+ }
}
else
{
- _logger.debug("sessionClose() failover in progress");
+ _logger.debug("Not starting the failover thread as state currently " + _failoverState);
}
}
+
+ if (failoverNotAllowed)
+ {
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+ }
}
- _logger.debug("Protocol Session [" + this + "] closed");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Protocol Session [" + this + "] closed");
+ }
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -280,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -289,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -297,14 +320,29 @@ public class AMQProtocolHandler implements ProtocolEngine
*/
public void exception(Throwable cause)
{
- if (_failoverState == FailoverState.NOT_STARTED)
+ boolean causeIsAConnectionProblem =
+ cause instanceof AMQConnectionClosedException ||
+ cause instanceof IOException ||
+ cause instanceof TransportException;
+
+ if (causeIsAConnectionProblem)
{
- if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+ //ensure the IoSender and IoReceiver are closed
+ try
{
- _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attempt failover
_network.close();
- closed();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+ FailoverState state = getFailoverState();
+ if (state == FailoverState.NOT_STARTED)
+ {
+ if (causeIsAConnectionProblem)
+ {
+ _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
}
else
{
@@ -319,7 +357,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
+ else if (state == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
@@ -329,6 +367,10 @@ public class AMQProtocolHandler implements ProtocolEngine
propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
+ else
+ {
+ _logger.warn("Exception caught by protocol handler: " + cause, cause);
+ }
}
/**
@@ -403,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -431,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -521,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -792,14 +834,14 @@ public class AMQProtocolHandler implements ProtocolEngine
return _protocolSession;
}
- FailoverState getFailoverState()
+ synchronized FailoverState getFailoverState()
{
return _failoverState;
}
- public void setFailoverState(FailoverState failoverState)
+ public synchronized void setFailoverState(FailoverState failoverState)
{
- _failoverState = failoverState;
+ _failoverState= failoverState;
}
public byte getProtocolMajorVersion()
@@ -843,6 +885,23 @@ public class AMQProtocolHandler implements ProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ protected Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
@@ -850,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -865,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}