package org.apache.qpid.messaging.util.failover; import org.apache.qpid.messaging.ConnectionException; import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.SenderException; import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; import org.apache.qpid.messaging.TransportFailureException; import org.apache.qpid.messaging.internal.ConnectionEvent; import org.apache.qpid.messaging.internal.ConnectionEventListener; import org.apache.qpid.messaging.internal.SenderInternal; import org.apache.qpid.messaging.internal.SessionInternal; import org.apache.qpid.messaging.util.AbstractSenderDecorator; import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** *

A Decorator that adds failover and basic housekeeping tasks to a Sender. * This class adds, *

    *
  1. Failover support.
  2. *
  3. State management.
  4. *
  5. Exception handling.
  6. *
  7. Failover
  8. *

* *

Exception Handling
* This class will wrap each method call to it's delegate to handle error situations. * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state. * If latter it will wait until the Sender is moved to OPENED, CLOSED or the timer expires. * For the last two cases a SenderException will be thrown and the Sender closed.

* *

TransportFailureException
* This class intercepts TransportFailureExceptions and are passed onto the session, * via the exception() method, which in turn passes into the connection. * The Sender will be marked as FAILOVER_IN_PROGRESS and the "operation" will be * blocked until the exception() on the Session object returns. At this point * the Sender is either moved to OPENED or CLOSED.

* *

SessionException
* For the time being, anytime a session exception is received, the Sender will be marked CLOSED. * We need to revisit this.

* *

Close() can be called by, *

    *
  1. The application (normal close).
  2. *
  3. By the session object, if close is called on it.(normal close)
  4. *
  5. By the connection object, if close is called on it.(normal close)
  6. *
  7. By the connection object, if failover was unsuccessful(error)
  8. *
  9. By itself (via the session) if it receives and exception (error).
  10. *
*

*/ public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener { private static Logger _logger = LoggerFactory.getLogger(SenderFailoverDecorator.class); public enum SenderState {OPENED, CLOSED, FAILOVER_IN_PROGRESS}; private SenderState _state = SenderState.OPENED; private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); private SenderException _lastException; private long _connSerialNumber = 0; public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) { super(ssn,delegate); synchronized(_connectionLock) { _connSerialNumber = ssn.getConnectionInternal().getSerialNumber(); } } @Override public void send(Message message, boolean sync) throws MessagingException { checkPreConditions(); long serialNumber = _connSerialNumber; // take a snapshot try { _delegate.send(message, sync); } catch (TransportFailureException e) { failover(e,serialNumber); send(message, sync); } catch (SessionException e) { throw handleSessionException(e); } } @Override public void close() throws MessagingException { synchronized (_connectionLock) { if (_state == SenderState.CLOSED) { throw new MessagingException("Sender is already closed"); } _state = SenderState.CLOSED; super.close(); } } @Override public void setCapacity(int capacity) throws MessagingException { checkPreConditions(); long serialNumber = _connSerialNumber; // take a snapshot try { _delegate.setCapacity(capacity); } catch (TransportFailureException e) { failover(e,serialNumber); setCapacity(capacity); } catch (SessionException e) { throw handleSessionException(e); } } @Override public int getCapacity() throws MessagingException { checkPreConditions(); long serialNumber = _connSerialNumber; // take a snapshot try { return _delegate.getCapacity(); } catch (TransportFailureException e) { failover(e,serialNumber); return getCapacity(); } catch (SessionException e) { throw handleSessionException(e); } } @Override public int getAvailable() throws MessagingException { checkPreConditions(); long serialNumber = _connSerialNumber; // take a snapshot try { return _delegate.getAvailable(); } catch (TransportFailureException e) { failover(e,serialNumber); return getAvailable(); } catch (SessionException e) { throw handleSessionException(e); } } @Override public int getUnsettled() throws MessagingException { checkPreConditions(); long serialNumber = _connSerialNumber; // take a snapshot try { return _delegate.getUnsettled(); } catch (TransportFailureException e) { failover(e,serialNumber); return getUnsettled(); } catch (SessionException e) { throw handleSessionException(e); } } @Override public boolean isClosed() throws MessagingException { return _state == SenderState.CLOSED; } @Override public String getName() throws MessagingException { checkPreConditions(); return getName(); } @Override public Session getSession() throws MessagingException { checkPreConditions(); _ssn.checkError(); return _ssn; } @Override public void recreate() throws MessagingException { synchronized(_connectionLock) { _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); _delegate.recreate(); _state = SenderState.OPENED; } } @Override public void eventOccured(ConnectionEvent event) { synchronized (_connectionLock) { switch(event.getType()) { case PRE_FAILOVER: case CONNECTION_LOST: _state = SenderState.FAILOVER_IN_PROGRESS; break; case RECONNCTED: _state = SenderState.OPENED; break; case POST_FAILOVER: try { if (_state != SenderState.OPENED) { close(); } } catch (MessagingException e) { _logger.warn("Exception when trying to close the Sender", e); } _connectionLock.notifyAll(); break; default: break; //ignore the rest } } } @Override // From ConnectionEventListener public void exception(ConnectionException e) {// NOOP } protected void waitForFailoverToComplete() throws SenderException { synchronized (_connectionLock) { try { _connectionLock.wait(_failoverTimeout); } catch (InterruptedException e) { //ignore. } if (_state == SenderState.CLOSED) { throw new SenderException("Sender is closed. Failover was unsuccesfull",_lastException); } else if (_state == SenderState.FAILOVER_IN_PROGRESS) { closeInternal(); throw new SenderException("Sender is closed. Failover did not complete on time"); } } } protected void failover(TransportFailureException e, long serialNumber) throws SenderException { synchronized (_connectionLock) { if (_connSerialNumber > serialNumber) { return; // ignore, we already have failed over. } _state = SenderState.FAILOVER_IN_PROGRESS; _ssn.exception(e, serialNumber); // This triggers failover. waitForFailoverToComplete(); } } protected void checkPreConditions() throws SenderException { switch (_state) { case CLOSED: throw new SenderException("Sender is closed. You cannot invoke methods on a closed sender",_lastException); case FAILOVER_IN_PROGRESS: waitForFailoverToComplete(); } } /** * Session Exceptions will generally invalidate the Session. * TODO this needs to be revisited again. * A new session will need to be created in that case. * @param e * @throws MessagingException */ protected SenderException handleSessionException(SessionException e) { synchronized (_connectionLock) { // This should close all senders (including this) and Senders. _ssn.exception(e); } return new SenderException("Session has been closed",e); } /** Suppress Exceptions as */ private void closeInternal() { try { close(); } catch (Exception e) { //ignore } } }