diff options
Diffstat (limited to 'qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java')
-rw-r--r-- | qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java | 66 |
1 files changed, 57 insertions, 9 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java index 79f87db4d5..ae9296ab0b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java @@ -3,7 +3,6 @@ package org.apache.qpid.messaging.util.failover; import org.apache.qpid.messaging.ConnectionException; import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.ReceiverException; import org.apache.qpid.messaging.SenderException; import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; @@ -13,16 +12,46 @@ import org.apache.qpid.messaging.internal.ConnectionEventListener; import org.apache.qpid.messaging.internal.SenderInternal; import org.apache.qpid.messaging.internal.SessionInternal; import org.apache.qpid.messaging.util.AbstractSenderDecorator; +import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A Decorator that adds basic housekeeping tasks to a Sender. + * <p>A Decorator that adds failover and basic housekeeping tasks to a Sender. * This class adds, - * 1. State management. - * 2. Exception handling. - * 3. Failover + * <ol> + * <li>Failover support.</li> + * <li>State management.</li> + * <li>Exception handling.</li> + * <li>Failover</li> + * </ol></p> * + * <p><b>Exception Handling</b><br> + * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state. + * If latter it will wait until the Sender is moved to OPENED, CLOSED or the timer expires. + * For the last two cases a SenderException will be thrown and the Sender closed.</p> + * + * <p><b>TransportFailureException</b><br> + * This class intercepts TransportFailureExceptions and are passed onto the session, + * via the exception() method, which in turn passes into the connection. + * The Sender will be marked as FAILOVER_IN_PROGRESS and the "operation" will be + * blocked until the exception() on the Session object returns. At this point + * the Sender is either moved to OPENED or CLOSED.</p> + * + * <p><b>SessionException</b><br> + * For the time being, anytime a session exception is received, the Sender will be marked CLOSED. + * We need to revisit this.</p> + * + * <p><i> <b>Close() can be called by,</b> + * <ol> + * <li>The application (normal close).</li> + * <li>By the session object, if close is called on it.(normal close)</li> + * <li>By the connection object, if close is called on it.(normal close)</li> + * <li>By the connection object, if failover was unsuccessful(error)</li> + * <li>By itself (via the session) if it receives and exception (error).</li> + * </ol> + * </i></p> */ public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener { @@ -32,7 +61,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements private SenderState _state = SenderState.OPENED; private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); - private ReceiverException _lastException; + private SenderException _lastException; private long _connSerialNumber = 0; public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) @@ -186,6 +215,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements { _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); _delegate.recreate(); + _state = SenderState.OPENED; } } @@ -213,7 +243,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements } catch (MessagingException e) { - _logger.warn("Exception when trying to close the receiver", e); + _logger.warn("Exception when trying to close the Sender", e); } _connectionLock.notifyAll(); break; @@ -242,7 +272,12 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements } if (_state == SenderState.CLOSED) { - throw new SenderException("Receiver is closed. Failover was unsuccesfull",_lastException); + throw new SenderException("Sender is closed. Failover was unsuccesfull",_lastException); + } + else if (_state == SenderState.FAILOVER_IN_PROGRESS) + { + closeInternal(); + throw new SenderException("Sender is closed. Failover did not complete on time"); } } } @@ -283,9 +318,22 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements { synchronized (_connectionLock) { - // This should close all senders (including this) and receivers. + // This should close all senders (including this) and Senders. _ssn.exception(e); } return new SenderException("Session has been closed",e); } + + /** Suppress Exceptions as */ + private void closeInternal() + { + try + { + close(); + } + catch (Exception e) + { + //ignore + } + } } |