summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java644
1 files changed, 332 insertions, 312 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
index aea25a403b..1d6541ec3e 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
@@ -55,366 +58,383 @@ import org.apache.qpid.nclient.util.AMQPValidator;
*/
public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
{
- private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
+ private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
- private Phase _phase;
+ private Phase _phase;
- private TransportConnection _connection;
+ private TransportConnection _connection;
- private long _correlationId;
+ private long _correlationId;
- private AMQPState _currentState;
+ private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
- AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
+ private AMQPStateManager _stateManager;
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
- private final Lock _lock = new ReentrantLock();
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
- private final Condition _connectionNotStarted = _lock.newCondition();
+ private final Lock _lock = new ReentrantLock();
- private final Condition _connectionNotSecure = _lock.newCondition();
+ private final Condition _connectionNotStarted = _lock.newCondition();
- private final Condition _connectionNotTuned = _lock.newCondition();
+ private final Condition _connectionNotSecure = _lock.newCondition();
- private final Condition _connectionNotOpened = _lock.newCondition();
+ private final Condition _connectionNotTuned = _lock.newCondition();
- private final Condition _connectionNotClosed = _lock.newCondition();
+ private final Condition _connectionNotOpened = _lock.newCondition();
- private ConnectionStartBody _connectionStartBody;
+ private final Condition _connectionNotClosed = _lock.newCondition();
- private ConnectionSecureBody _connectionSecureBody;
+ private ConnectionStartBody _connectionStartBody;
- private ConnectionTuneBody _connectionTuneBody;
+ private ConnectionSecureBody _connectionSecureBody;
- private ConnectionOpenOkBody _connectionOpenOkBody;
+ private ConnectionTuneBody _connectionTuneBody;
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
+ private ConnectionOpenOkBody _connectionOpenOkBody;
- protected AMQPConnection(TransportConnection connection)
- {
- _connection = connection;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
+ private ConnectionCloseOkBody _connectionCloseOkBody;
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
+ private ConnectionCloseBody _connectionCloseBody;
- /**
- * Opens the TCP connection and let the formalities begin.
- */
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
+ protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
{
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
+ _connection = connection;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
}
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The current java broker implementation can send a connection tune body
- * as a response to the startOk. Not sure if that is the correct behaviour.
- */
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
- //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
- //_currentState = AMQPState.CONNECTION_NOT_SECURE;
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
+ /**
+ * Opens the TCP connection and let the formalities begin.
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
{
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+ notifyState(AMQPState.CONNECTION_NOT_STARTED);
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error opening connection to broker", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
{
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.startOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (InterruptedException e)
+
+ /**
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
{
- throw new AMQPException("Error in connection.close", e);
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ _phase.messageSent(msg);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.secureOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
{
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ notifyState(AMQPState.CONNECTION_NOT_OPENED);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- /**
- * ------------------------------------------- AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
+
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
{
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
+ _lock.lock();
+ try
+ {
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+ notifyState(AMQPState.CONNECTION_OPEN);
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
{
- _lock.unlock();
- }
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+ notifyState(AMQPState.CONNECTION_CLOSED);
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- catch (Exception e)
+
+ /**
+ * -------------------------------------------
+ * AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
{
- throw new AMQPException("Error handling connection.close from broker", e);
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
- private void checkIfConnectionClosed()throws AMQPException
- {
- if (_connectionCloseBody != null)
+
+
+ public Phase getPhasePipe()
{
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
- " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
- "caused by class " + _connectionCloseBody.getClassId() +
- " and method " + _connectionCloseBody.getMethod();
-
- throw new AMQPException(error);
+ return _phase;
}
- }
-
- private void releaseLocks()
- {
- if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
+
+ private void handleClose() throws AMQPException
{
- _connectionNotOpened.signal();
+ try
+ {
+ notifyState(AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling connection.close from broker", e);
+ }
}
- else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
+
+ private void checkIfConnectionClosed() throws AMQPException
{
- _connectionNotStarted.signal();
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+ + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+ + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
}
- else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
+
+ private void releaseLocks()
{
- _connectionNotSecure.signal();
+ if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+ {
+ _connectionNotOpened.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+ {
+ _connectionNotStarted.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+ {
+ _connectionNotSecure.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+ {
+ _connectionNotTuned.signal();
+ }
}
- else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
+
+ private void notifyState(AMQPState newState) throws AMQPException
{
- _connectionNotTuned.signal();
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
}
- }
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
} \ No newline at end of file