diff options
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java')
-rw-r--r-- | java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java | 644 |
1 files changed, 332 insertions, 312 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java index aea25a403b..1d6541ec3e 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -40,7 +40,10 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; @@ -55,366 +58,383 @@ import org.apache.qpid.nclient.util.AMQPValidator; */ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener { - private static final Logger _logger = Logger.getLogger(AMQPConnection.class); + private static final Logger _logger = Logger.getLogger(AMQPConnection.class); - private Phase _phase; + private Phase _phase; - private TransportConnection _connection; + private TransportConnection _connection; - private long _correlationId; + private long _correlationId; - private AMQPState _currentState; + private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, - AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, }; + private AMQPStateManager _stateManager; - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; - private final Lock _lock = new ReentrantLock(); + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; - private final Condition _connectionNotStarted = _lock.newCondition(); + private final Lock _lock = new ReentrantLock(); - private final Condition _connectionNotSecure = _lock.newCondition(); + private final Condition _connectionNotStarted = _lock.newCondition(); - private final Condition _connectionNotTuned = _lock.newCondition(); + private final Condition _connectionNotSecure = _lock.newCondition(); - private final Condition _connectionNotOpened = _lock.newCondition(); + private final Condition _connectionNotTuned = _lock.newCondition(); - private final Condition _connectionNotClosed = _lock.newCondition(); + private final Condition _connectionNotOpened = _lock.newCondition(); - private ConnectionStartBody _connectionStartBody; + private final Condition _connectionNotClosed = _lock.newCondition(); - private ConnectionSecureBody _connectionSecureBody; + private ConnectionStartBody _connectionStartBody; - private ConnectionTuneBody _connectionTuneBody; + private ConnectionSecureBody _connectionSecureBody; - private ConnectionOpenOkBody _connectionOpenOkBody; + private ConnectionTuneBody _connectionTuneBody; - private ConnectionCloseOkBody _connectionCloseOkBody; - - private ConnectionCloseBody _connectionCloseBody; + private ConnectionOpenOkBody _connectionOpenOkBody; - protected AMQPConnection(TransportConnection connection) - { - _connection = connection; - _currentState = AMQPState.CONNECTION_UNDEFINED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } + private ConnectionCloseOkBody _connectionCloseOkBody; - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ + private ConnectionCloseBody _connectionCloseBody; - /** - * Opens the TCP connection and let the formalities begin. - */ - public ConnectionStartBody openTCPConnection() throws AMQPException - { - _lock.lock(); - // open the TCP connection - try + protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager) { - _connectionStartBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); - _phase = _connection.connect(); - - // waiting for ConnectionStartBody or error in connection - //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotStarted.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); - _currentState = AMQPState.CONNECTION_NOT_STARTED; - return _connectionStartBody; + _connection = connection; + _stateManager = stateManager; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); } - catch (InterruptedException e) - { - throw new AMQPException("Error opening connection to broker", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * The current java broker implementation can send a connection tune body - * as a response to the startOk. Not sure if that is the correct behaviour. - */ - public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); - _phase.messageSent(msg); - //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotSecure.await(); - //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time"); - //_currentState = AMQPState.CONNECTION_NOT_SECURE; - - checkIfConnectionClosed(); - if (_connectionTuneBody != null) - { - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.startOk", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could - * issue a new challenge - */ - public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionTuneBody = null; - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); - - _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); - _phase.messageSent(msg); - - //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotTuned.await(); - checkIfConnectionClosed(); - - if (_connectionTuneBody != null) - { - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.secureOk", e); - } - finally - { - _lock.unlock(); - } - } - public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException - { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); - _connectionSecureBody = null; - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); - _phase.messageSent(msg); - _currentState = AMQPState.CONNECTION_NOT_OPENED; - } - finally - { - _lock.unlock(); - } - } + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ - public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException - { - _lock.lock(); - try + /** + * Opens the TCP connection and let the formalities begin. + */ + public ConnectionStartBody openTCPConnection() throws AMQPException { - // If the broker sends a connection close due to an error with the - // Connection tune ok, then this call will verify that - checkIfConnectionClosed(); - - _connectionOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotOpened.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); - _currentState = AMQPState.CONNECTION_OPEN; - return _connectionOpenOkBody; + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); + notifyState(AMQPState.CONNECTION_NOT_STARTED); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error opening connection to broker", e); + } + finally + { + _lock.unlock(); + } } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.open", e); - } - finally - { - _lock.unlock(); - } - } - public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException - { - _lock.lock(); - try + /** + * The current java broker implementation can send a connection tune body + * as a response to the startOk. Not sure if that is the correct behaviour. + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException { - _connectionCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); - _currentState = AMQPState.CONNECTION_CLOSED; - return _connectionCloseOkBody; + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.startOk", e); + } + finally + { + _lock.unlock(); + } } - catch (InterruptedException e) + + /** + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException { - throw new AMQPException("Error in connection.close", e); + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); + _phase.messageSent(msg); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.secureOk", e); + } + finally + { + _lock.unlock(); + } } - finally + + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException { - _lock.unlock(); + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + notifyState(AMQPState.CONNECTION_NOT_OPENED); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + finally + { + _lock.unlock(); + } } - } - - /** - * ------------------------------------------- AMQMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try + + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException { - _correlationId = evt.getCorrelationId(); - - if (evt.getMethod() instanceof ConnectionStartBody) - { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signalAll(); - return true; - } - else if (evt.getMethod() instanceof ConnectionSecureBody) - { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; - } - else if (evt.getMethod() instanceof ConnectionTuneBody) - { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk - _connectionNotTuned.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) - { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); - _connectionNotOpened.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) - { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseBody) - { - _connectionCloseBody = (ConnectionCloseBody)evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleClose(); - return true; - } - else - { - return false; - } + _lock.lock(); + try + { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); + notifyState(AMQPState.CONNECTION_OPEN); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.open", e); + } + finally + { + _lock.unlock(); + } } - finally + + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException { - _lock.unlock(); - } - } - - private void handleClose() throws AMQPException - { - try - { - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); + notifyState(AMQPState.CONNECTION_CLOSED); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.close", e); + } + finally + { + _lock.unlock(); + } } - catch (Exception e) + + /** + * ------------------------------------------- + * AMQMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - throw new AMQPException("Error handling connection.close from broker", e); + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } } - } - - private void checkIfConnectionClosed()throws AMQPException - { - if (_connectionCloseBody != null) + + + public Phase getPhasePipe() { - String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + - " with reply code (" + _connectionCloseBody.getReplyCode() + ") " + - "caused by class " + _connectionCloseBody.getClassId() + - " and method " + _connectionCloseBody.getMethod(); - - throw new AMQPException(error); + return _phase; } - } - - private void releaseLocks() - { - if(_currentState == AMQPState.CONNECTION_NOT_OPENED) + + private void handleClose() throws AMQPException { - _connectionNotOpened.signal(); + try + { + notifyState(AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("Error handling connection.close from broker", e); + } } - else if(_currentState == AMQPState.CONNECTION_UNDEFINED) + + private void checkIfConnectionClosed() throws AMQPException { - _connectionNotStarted.signal(); + if (_connectionCloseBody != null) + { + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" + + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " + + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); + } } - else if(_currentState == AMQPState.CONNECTION_NOT_STARTED) + + private void releaseLocks() { - _connectionNotSecure.signal(); + if (_currentState == AMQPState.CONNECTION_NOT_OPENED) + { + _connectionNotOpened.signal(); + } + else if (_currentState == AMQPState.CONNECTION_UNDEFINED) + { + _connectionNotStarted.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) + { + _connectionNotSecure.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) + { + _connectionNotTuned.signal(); + } } - else if(_currentState == AMQPState.CONNECTION_NOT_SECURE) + + private void notifyState(AMQPState newState) throws AMQPException { - _connectionNotTuned.signal(); + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); } - } - - public Phase getPhasePipe() - { - return _phase; - } }
\ No newline at end of file |