summaryrefslogtreecommitdiff
path: root/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java')
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java66
1 files changed, 57 insertions, 9 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
index 79f87db4d5..ae9296ab0b 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
@@ -3,7 +3,6 @@ package org.apache.qpid.messaging.util.failover;
import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.ReceiverException;
import org.apache.qpid.messaging.SenderException;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.SessionException;
@@ -13,16 +12,46 @@ import org.apache.qpid.messaging.internal.ConnectionEventListener;
import org.apache.qpid.messaging.internal.SenderInternal;
import org.apache.qpid.messaging.internal.SessionInternal;
import org.apache.qpid.messaging.util.AbstractSenderDecorator;
+import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A Decorator that adds basic housekeeping tasks to a Sender.
+ * <p>A Decorator that adds failover and basic housekeeping tasks to a Sender.
* This class adds,
- * 1. State management.
- * 2. Exception handling.
- * 3. Failover
+ * <ol>
+ * <li>Failover support.</li>
+ * <li>State management.</li>
+ * <li>Exception handling.</li>
+ * <li>Failover</li>
+ * </ol></p>
*
+ * <p><b>Exception Handling</b><br>
+ * This class will wrap each method call to it's delegate to handle error situations.
+ * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state.
+ * If latter it will wait until the Sender is moved to OPENED, CLOSED or the timer expires.
+ * For the last two cases a SenderException will be thrown and the Sender closed.</p>
+ *
+ * <p><b>TransportFailureException</b><br>
+ * This class intercepts TransportFailureExceptions and are passed onto the session,
+ * via the exception() method, which in turn passes into the connection.
+ * The Sender will be marked as FAILOVER_IN_PROGRESS and the "operation" will be
+ * blocked until the exception() on the Session object returns. At this point
+ * the Sender is either moved to OPENED or CLOSED.</p>
+ *
+ * <p><b>SessionException</b><br>
+ * For the time being, anytime a session exception is received, the Sender will be marked CLOSED.
+ * We need to revisit this.</p>
+ *
+ * <p><i> <b>Close() can be called by,</b>
+ * <ol>
+ * <li>The application (normal close).</li>
+ * <li>By the session object, if close is called on it.(normal close)</li>
+ * <li>By the connection object, if close is called on it.(normal close)</li>
+ * <li>By the connection object, if failover was unsuccessful(error)</li>
+ * <li>By itself (via the session) if it receives and exception (error).</li>
+ * </ol>
+ * </i></p>
*/
public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener
{
@@ -32,7 +61,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
private SenderState _state = SenderState.OPENED;
private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
- private ReceiverException _lastException;
+ private SenderException _lastException;
private long _connSerialNumber = 0;
public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate)
@@ -186,6 +215,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
{
_connSerialNumber = _ssn.getConnectionInternal().getSerialNumber();
_delegate.recreate();
+ _state = SenderState.OPENED;
}
}
@@ -213,7 +243,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
}
catch (MessagingException e)
{
- _logger.warn("Exception when trying to close the receiver", e);
+ _logger.warn("Exception when trying to close the Sender", e);
}
_connectionLock.notifyAll();
break;
@@ -242,7 +272,12 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
}
if (_state == SenderState.CLOSED)
{
- throw new SenderException("Receiver is closed. Failover was unsuccesfull",_lastException);
+ throw new SenderException("Sender is closed. Failover was unsuccesfull",_lastException);
+ }
+ else if (_state == SenderState.FAILOVER_IN_PROGRESS)
+ {
+ closeInternal();
+ throw new SenderException("Sender is closed. Failover did not complete on time");
}
}
}
@@ -283,9 +318,22 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
{
synchronized (_connectionLock)
{
- // This should close all senders (including this) and receivers.
+ // This should close all senders (including this) and Senders.
_ssn.exception(e);
}
return new SenderException("Session has been closed",e);
}
+
+ /** Suppress Exceptions as */
+ private void closeInternal()
+ {
+ try
+ {
+ close();
+ }
+ catch (Exception e)
+ {
+ //ignore
+ }
+ }
}