summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java152
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java125
3 files changed, 117 insertions, 169 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index b314453e31..816caac824 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.IOException;
@@ -177,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -210,48 +215,67 @@ public class AMQProtocolHandler implements ProtocolEngine
}
else
{
- _logger.debug("Session closed called with failover state currently " + _failoverState);
-
- // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+ // Use local variable to keep flag whether fail-over allowed or not,
+ // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+ // otherwise it might deadlock with failover mutex
+ boolean failoverNotAllowed = false;
+ synchronized (this)
{
- _logger.debug("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.debug("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _logger.debug("Failover not allowed by policy."); // or already in progress?
-
if (_logger.isDebugEnabled())
{
- _logger.debug(_connection.getFailoverPolicy().toString());
+ _logger.debug("Session closed called with failover state " + _failoverState);
}
- if (_failoverState != FailoverState.IN_PROGRESS)
+ // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+ if (_failoverState == FailoverState.NOT_STARTED)
{
- _logger.debug("sessionClose() not allowed to failover");
- _connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
- _stateManager.getLastException()));
+ // close the sender
+ try
+ {
+ _sender.close();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception occured on closing the sender", e);
+ }
+ if (_connection.failoverAllowed())
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+
+ _logger.debug("FAILOVER STARTING");
+ startFailoverThread();
+ }
+ else if (_connection.isConnected())
+ {
+ failoverNotAllowed = true;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+ }
+ }
+ else
+ {
+ _logger.debug("We are in process of establishing the initial connection");
+ }
}
else
{
- _logger.debug("sessionClose() failover in progress");
+ _logger.debug("Not starting the failover thread as state currently " + _failoverState);
}
}
+
+ if (failoverNotAllowed)
+ {
+ _connection.exceptionReceived(new AMQDisconnectedException(
+ "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+ }
}
- _logger.debug("Protocol Session [" + this + "] closed");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Protocol Session [" + this + "] closed");
+ }
}
/** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -280,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -289,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -297,14 +320,29 @@ public class AMQProtocolHandler implements ProtocolEngine
*/
public void exception(Throwable cause)
{
- if (_failoverState == FailoverState.NOT_STARTED)
+ boolean causeIsAConnectionProblem =
+ cause instanceof AMQConnectionClosedException ||
+ cause instanceof IOException ||
+ cause instanceof TransportException;
+
+ if (causeIsAConnectionProblem)
{
- if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+ //ensure the IoSender and IoReceiver are closed
+ try
{
- _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attempt failover
_network.close();
- closed();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
+ FailoverState state = getFailoverState();
+ if (state == FailoverState.NOT_STARTED)
+ {
+ if (causeIsAConnectionProblem)
+ {
+ _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
}
else
{
@@ -319,7 +357,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
// we reach this point if failover was attempted and failed therefore we need to let the calling app
// know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
+ else if (state == FailoverState.FAILED)
{
_logger.error("Exception caught by protocol handler: " + cause, cause);
@@ -329,6 +367,10 @@ public class AMQProtocolHandler implements ProtocolEngine
propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
+ else
+ {
+ _logger.warn("Exception caught by protocol handler: " + cause, cause);
+ }
}
/**
@@ -403,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -431,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -521,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -792,14 +834,14 @@ public class AMQProtocolHandler implements ProtocolEngine
return _protocolSession;
}
- FailoverState getFailoverState()
+ synchronized FailoverState getFailoverState()
{
return _failoverState;
}
- public void setFailoverState(FailoverState failoverState)
+ public synchronized void setFailoverState(FailoverState failoverState)
{
- _failoverState = failoverState;
+ _failoverState= failoverState;
}
public byte getProtocolMajorVersion()
@@ -843,6 +885,23 @@ public class AMQProtocolHandler implements ProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ protected Sender<ByteBuffer> getSender()
+ {
+ return _sender;
+ }
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
@@ -850,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -865,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index af57fd98fc..aed10cf15f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -48,6 +48,8 @@ import org.apache.qpid.transport.TransportException;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
+
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -265,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
@@ -372,6 +374,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public Sender<ByteBuffer> getSender()
+ {
+ return _protocolHandler.getSender();
+ }
+
public void failover(String host, int port)
{
_protocolHandler.failover(host, port);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
deleted file mode 100644
index d387a8ba93..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-class HeartbeatDiagnostics
-{
- private static final Diagnostics _impl = init();
-
- private HeartbeatDiagnostics()
- {
- }
-
- private static Diagnostics init()
- {
- return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
- }
-
- static void sent()
- {
- _impl.sent();
- }
-
- static void timeout()
- {
- _impl.timeout();
- }
-
- static void received(boolean heartbeat)
- {
- _impl.received(heartbeat);
- }
-
- static void init(int delay, int timeout)
- {
- _impl.init(delay, timeout);
- }
-
- private static interface Diagnostics
- {
- void sent();
- void timeout();
- void received(boolean heartbeat);
- void init(int delay, int timeout);
- }
-
- private static class On implements Diagnostics
- {
- private final String[] messages = new String[50];
- private int i;
-
- private void save(String msg)
- {
- messages[i++] = msg;
- if(i >= messages.length){
- i = 0;//i.e. a circular buffer
- }
- }
-
- public void sent()
- {
- save(System.currentTimeMillis() + ": sent heartbeat");
- }
-
- public void timeout()
- {
- for(int i = 0; i < messages.length; i++)
- {
- if(messages[i] != null)
- {
- System.out.println(messages[i]);
- }
- }
- System.out.println(System.currentTimeMillis() + ": timed out");
- }
-
- public void received(boolean heartbeat)
- {
- save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
- }
-
- public void init(int delay, int timeout)
- {
- System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
- }
- }
-
- private static class Off implements Diagnostics
- {
- public void sent()
- {
-
- }
- public void timeout()
- {
-
- }
- public void received(boolean heartbeat)
- {
-
- }
-
- public void init(int delay, int timeout)
- {
-
- }
- }
-}