From cba3d4c40163ec7592bdee548bf776b848c69ca6 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 26 Jun 2012 15:54:25 +0000 Subject: QPID-4027 Added javadoc, fixed typos, and some bugs. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1354075 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/messaging/internal/MessageInternal.java | 2 +- .../qpid/messaging/internal/ReceiverInternal.java | 3 + .../qpid/messaging/internal/SenderInternal.java | 3 + .../qpid/messaging/internal/SessionInternal.java | 3 + .../messaging/util/MessageFactory_AMQP_0_10.java | 2 +- .../util/failover/ConnectionFailoverDecorator.java | 69 ++++++++++++++-------- .../util/failover/ReceiverFailoverDecorator.java | 56 +++++++++++++++++- .../util/failover/SenderFailoverDecorator.java | 66 ++++++++++++++++++--- .../util/failover/SessionFailoverDecorator.java | 57 +++++++++++++++++- 9 files changed, 223 insertions(+), 38 deletions(-) diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/MessageInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/MessageInternal.java index 9ee0b5a032..7273bc3c05 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/MessageInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/MessageInternal.java @@ -46,7 +46,7 @@ public interface MessageInternal extends Message * The calling Object should know how to cast the generic object * to the required type. * Ex @see {@link CppMessageFactory#CppMessageDelegate} - * and @see {@link CppSender + * and @see {@link CppSender} */ public Object getFactorySpecificMessageDelegate(); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ReceiverInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ReceiverInternal.java index d81e815112..86639303db 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ReceiverInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ReceiverInternal.java @@ -20,6 +20,9 @@ package org.apache.qpid.messaging.internal; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Receiver; +/** + * An extended interface meant for API implementors. + */ public interface ReceiverInternal extends Receiver { public void recreate() throws MessagingException; diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SenderInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SenderInternal.java index 586cfccffd..747ebc183b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SenderInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SenderInternal.java @@ -20,6 +20,9 @@ package org.apache.qpid.messaging.internal; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Sender; +/** + * An extended interface meant for API implementors. + */ public interface SenderInternal extends Sender { public void recreate() throws MessagingException; diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java index c063e8d3c3..51c7add591 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java @@ -22,6 +22,9 @@ import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; import org.apache.qpid.messaging.TransportFailureException; +/** + * An extended interface meant for API implementors. + */ public interface SessionInternal extends Session { public ConnectionInternal getConnectionInternal(); diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java index cbe18796f6..1b6a2cdc53 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/MessageFactory_AMQP_0_10.java @@ -19,7 +19,7 @@ import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; /** - * A generic message factory that is based on the AMQO 0-10 encoding. + * A generic message factory that is based on the AMQP 0-10 encoding. * */ public class MessageFactory_AMQP_0_10 extends AbstractMessageFactory diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java index 4f7b09b5c2..7dc2f7aa4b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ConnectionFailoverDecorator.java @@ -39,14 +39,36 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * - * Closing after unsuccessful failover is not done yet! - * + *

A Decorator that adds failover and basic housekeeping tasks to a connection. + * This class adds, + *

    + *
  1. Failover support.
  2. + *
  3. Basic session mgt (tracking, default name generation ..etc).
  4. + *
  5. Connection state management.
  6. + *
  7. Error handling.
  8. + *

+ * + *

Close() can be called by, + *

    + *
  1. The application (normal close)
  2. + *
  3. By this object if failover is unsuccessful(error)
  4. + *
+ *

+ * + *

Failover
+ * This class intercepts TransportFailureExceptions immeidately notifies + * it's children that the connection was lost. Any calls made will wait + * until the connection is moved back to OPENED, CLOSED or the timer expires. + * It then attempts failover if it's allowed by the selected FailoverStrategy. + * If failover is successful it will move the connection to OPENED + * and the blocked operations will continue. + * If failover was not allowed or unsuccessful it will close the connection + * and a ConnectionException will be thrown to the client.

*/ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator { private static Logger _logger = LoggerFactory.getLogger(ConnectionFailoverDecorator.class); - + public enum ConnectionState { UNDEFINED, @@ -201,8 +223,8 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator failover(e,serialNumber); } catch(ConnectionException ex) - { - //ignore. + { + //ignore. //failover() handles notifications } } @@ -227,36 +249,36 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator throw new ConnectionException("Connection is closed. You cannot invoke methods on a closed connection"); case UNDEFINED: throw new ConnectionException("Connection should be opened before it can be used"); - case CONNECTION_LOST: + case CONNECTION_LOST: case FAILOVER_IN_PROGRESS: waitForFailoverToComplete(); } } - + protected void failover(TransportFailureException e,long serialNumber) throws ConnectionException { synchronized(_connectionLock) - { + { if (_serialNumber > serialNumber) { return; // Ignore, We have a working connection now. } - + _logger.warn("Connection lost!"); _state = ConnectionState.CONNECTION_LOST; - notifyEvent(new ConnectionEvent(this,EventType.CONNECTION_LOST,this)); - + notifyEvent(new ConnectionEvent(this,EventType.CONNECTION_LOST,this)); + if (_failoverStrategy.failoverAllowed()) { // Failover is allowed at least once. _state = ConnectionState.FAILOVER_IN_PROGRESS; notifyEvent(new ConnectionEvent(this,EventType.PRE_FAILOVER,this)); - - - + + + StringBuffer errorMsg = new StringBuffer(); while (_failoverStrategy.failoverAllowed()) - { + { try { ConnectionString conString = _failoverStrategy.getNextConnectionString(); @@ -267,6 +289,7 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator { recreate(); _state = ConnectionState.OPENED; + _logger.warn("Successfully connected to " + conString.getUrl()); _lastException = null; } catch (MessagingException ex) @@ -286,12 +309,12 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator notifyEvent(new ConnectionEvent(this,EventType.RECONNCTION_FAILED,this)); } } - + if (_state != ConnectionState.OPENED) { closeInternal(); _lastException = new ConnectionException("Failover was unsuccessful." + errorMsg.toString()); - _logger.warn("Faiolver was unsuccesful" + errorMsg.toString()); + _logger.warn("Failover was unsuccesful" + errorMsg.toString()); } notifyEvent(new ConnectionEvent(this,EventType.POST_FAILOVER,this)); } @@ -302,9 +325,9 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator _lastException = new ConnectionException("Connection Failed!",e); _logger.warn("Connection Failed!", e); } - + _connectionLock.notifyAll(); - + if (_lastException != null) { for (ConnectionEventListener l: _stateListeners) @@ -315,7 +338,7 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator } } } - + protected void waitForFailoverToComplete() throws ConnectionException { synchronized (_connectionLock) @@ -340,7 +363,7 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator // TODO add local IP and pid to the beginning; return _ssnNameGenerator.generate().toString(); } - + // Suppresses the exceptions private void closeInternal() { @@ -353,5 +376,5 @@ public class ConnectionFailoverDecorator extends AbstractConnectionDecorator //ignore } } - + } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java index 134d9ca808..65bcef97e5 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java @@ -29,15 +29,46 @@ import org.apache.qpid.messaging.internal.ConnectionEventListener; import org.apache.qpid.messaging.internal.ReceiverInternal; import org.apache.qpid.messaging.internal.SessionInternal; import org.apache.qpid.messaging.util.AbstractReceiverDecorator; +import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A Decorator that adds basic housekeeping tasks to a Receiver. + *

A Decorator that adds failover and basic housekeeping tasks to a Sender. * This class adds, - * 1. State management. - * 2. Exception handling. + *

    + *
  1. Failover support.
  2. + *
  3. State management.
  4. + *
  5. Exception handling.
  6. + *
  7. Failover
  8. + *

* + *

Exception Handling
+ * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state. + * If latter it will wait until the Receiver is moved to OPENED, CLOSED or the timer expires. + * For the last two cases a ReceiverException will be thrown and the Receiver closed.

+ * + *

TransportFailureException
+ * This class intercepts TransportFailureExceptions and are passed onto the session, + * via the exception() method, which in turn passes into the connection. + * The Receiver will be marked as FAILOVER_IN_PROGRESS and the "operation" will be + * blocked until the exception() on the Session object returns. At this point + * the Receiver is either moved to OPENED or CLOSED.

+ * + *

SessionException
+ * For the time being, anytime a session exception is received, the Receiver will be marked CLOSED. + * We need to revisit this.

+ * + *

Close() can be called by, + *

    + *
  1. The application (normal close).
  2. + *
  3. By the session object, if close is called on it.(normal close)
  4. + *
  5. By the connection object, if close is called on it.(normal close)
  6. + *
  7. By the connection object, if failover was unsuccessful(error)
  8. + *
  9. By itself (via the session) if it receives and exception (error).
  10. + *
+ *

*/ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator implements ConnectionEventListener { @@ -214,6 +245,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme { _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); _delegate.recreate(); + _state = ReceiverState.OPENED; } } @@ -283,6 +315,11 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme { throw new ReceiverException("Receiver is closed. Failover was unsuccesfull",_lastException); } + else if (_state == ReceiverState.FAILOVER_IN_PROGRESS) + { + closeInternal(); + throw new ReceiverException("Receiver is closed. Failover did not complete on time"); + } } } @@ -309,4 +346,17 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme waitForFailoverToComplete(); } } + + /** Suppress Exceptions as */ + private void closeInternal() + { + try + { + close(); + } + catch (Exception e) + { + //ignore + } + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java index 79f87db4d5..ae9296ab0b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java @@ -3,7 +3,6 @@ package org.apache.qpid.messaging.util.failover; import org.apache.qpid.messaging.ConnectionException; import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.ReceiverException; import org.apache.qpid.messaging.SenderException; import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; @@ -13,16 +12,46 @@ import org.apache.qpid.messaging.internal.ConnectionEventListener; import org.apache.qpid.messaging.internal.SenderInternal; import org.apache.qpid.messaging.internal.SessionInternal; import org.apache.qpid.messaging.util.AbstractSenderDecorator; +import org.apache.qpid.messaging.util.failover.SessionFailoverDecorator.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A Decorator that adds basic housekeeping tasks to a Sender. + *

A Decorator that adds failover and basic housekeeping tasks to a Sender. * This class adds, - * 1. State management. - * 2. Exception handling. - * 3. Failover + *

    + *
  1. Failover support.
  2. + *
  3. State management.
  4. + *
  5. Exception handling.
  6. + *
  7. Failover
  8. + *

* + *

Exception Handling
+ * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the Receiver is already CLOSED or FAILOVER_IN_PROGRESS state. + * If latter it will wait until the Sender is moved to OPENED, CLOSED or the timer expires. + * For the last two cases a SenderException will be thrown and the Sender closed.

+ * + *

TransportFailureException
+ * This class intercepts TransportFailureExceptions and are passed onto the session, + * via the exception() method, which in turn passes into the connection. + * The Sender will be marked as FAILOVER_IN_PROGRESS and the "operation" will be + * blocked until the exception() on the Session object returns. At this point + * the Sender is either moved to OPENED or CLOSED.

+ * + *

SessionException
+ * For the time being, anytime a session exception is received, the Sender will be marked CLOSED. + * We need to revisit this.

+ * + *

Close() can be called by, + *

    + *
  1. The application (normal close).
  2. + *
  3. By the session object, if close is called on it.(normal close)
  4. + *
  5. By the connection object, if close is called on it.(normal close)
  6. + *
  7. By the connection object, if failover was unsuccessful(error)
  8. + *
  9. By itself (via the session) if it receives and exception (error).
  10. + *
+ *

*/ public class SenderFailoverDecorator extends AbstractSenderDecorator implements ConnectionEventListener { @@ -32,7 +61,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements private SenderState _state = SenderState.OPENED; private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); - private ReceiverException _lastException; + private SenderException _lastException; private long _connSerialNumber = 0; public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) @@ -186,6 +215,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements { _connSerialNumber = _ssn.getConnectionInternal().getSerialNumber(); _delegate.recreate(); + _state = SenderState.OPENED; } } @@ -213,7 +243,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements } catch (MessagingException e) { - _logger.warn("Exception when trying to close the receiver", e); + _logger.warn("Exception when trying to close the Sender", e); } _connectionLock.notifyAll(); break; @@ -242,7 +272,12 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements } if (_state == SenderState.CLOSED) { - throw new SenderException("Receiver is closed. Failover was unsuccesfull",_lastException); + throw new SenderException("Sender is closed. Failover was unsuccesfull",_lastException); + } + else if (_state == SenderState.FAILOVER_IN_PROGRESS) + { + closeInternal(); + throw new SenderException("Sender is closed. Failover did not complete on time"); } } } @@ -283,9 +318,22 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements { synchronized (_connectionLock) { - // This should close all senders (including this) and receivers. + // This should close all senders (including this) and Senders. _ssn.exception(e); } return new SenderException("Session has been closed",e); } + + /** Suppress Exceptions as */ + private void closeInternal() + { + try + { + close(); + } + catch (Exception e) + { + //ignore + } + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java index 56944ca148..d49f681dd3 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java @@ -35,6 +35,41 @@ import org.apache.qpid.messaging.util.AbstractSessionDecorator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

A Decorator that adds failover and basic housekeeping tasks to a session. + * This class adds, + *

    + *
  1. Failover support.
  2. + *
  3. Management of receivers and senders created by this session.
  4. + *
  5. State management.
  6. + *
  7. Exception handling.
  8. + *

+ * + *

Exception Handling
+ * This class will wrap each method call to it's delegate to handle error situations. + * First it will check if the session is already CLOSED or FAILOVER_IN_PROGRESS state. + * If latter it will wait until the Session is moved to OPENED, CLOSED or the timer expires. + * For the last two cases a SessionException will be thrown and the Session closed.

+ * + *

TransportFailureException
+ * This class intercepts TransportFailureExceptions and are passed onto the connection. + * The Session will be marked as FAILOVER_IN_PROGRESS and the "operation" will be + * blocked until the exception() on the Connection object returns. At this point + * the Session is either moved to OPENED or CLOSED.

+ * + *

SessionException
+ * For the time being, anytime a session exception is received, the session will be marked CLOSED. + * We need to revisit this.

+ * + *

Close() can be called by, + *

    + *
  1. The application (normal close).
  2. + *
  3. By the connection object, if close is called on it.(normal close)
  4. + *
  5. By the connection object, if failover was unsuccessful(error)
  6. + *
  7. By itself if it receives and exception (error).
  8. + *
+ *

+ */ public class SessionFailoverDecorator extends AbstractSessionDecorator implements ConnectionEventListener { private static Logger _logger = LoggerFactory.getLogger(SessionFailoverDecorator.class); @@ -441,6 +476,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement { sender.recreate(); } + _state = SessionState.OPENED; } } @@ -528,6 +564,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement { try { + _lastException = e; close(); } catch(MessagingException ex) @@ -552,8 +589,26 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement } if (_state == SessionState.CLOSED) { - throw new SessionException("Session is closed. Failover was unsuccesfull",_lastException); + throw new SessionException("Session is closed. Failover was unsuccesfull"); + } + else if (_state == SessionState.FAILOVER_IN_PROGRESS) + { + closeInternal(); + throw new SessionException("Session is closed. Failover did not complete on time"); } } } + + /** Suppress Exceptions as */ + private void closeInternal() + { + try + { + close(); + } + catch (Exception e) + { + //ignore + } + } } -- cgit v1.2.1