diff options
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java')
-rw-r--r-- | java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java | 262 |
1 files changed, 167 insertions, 95 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java index 9c9e913cd3..aea25a403b 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -37,14 +37,14 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.model.AMQPMethodEvent; -import org.apache.qpid.nclient.model.AMQPMethodListener; import org.apache.qpid.nclient.transport.TransportConnection; import org.apache.qpid.nclient.util.AMQPValidator; @@ -65,9 +65,8 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen private AMQPState _currentState; - private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, - AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, - AMQPState.CONNECTION_OPEN, }; + private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, + AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, }; // The wait period until a server sends a respond private long _serverTimeOut = 1000; @@ -93,8 +92,10 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen private ConnectionOpenOkBody _connectionOpenOkBody; private ConnectionCloseOkBody _connectionCloseOkBody; + + private ConnectionCloseBody _connectionCloseBody; - public AMQPConnection(TransportConnection connection) + protected AMQPConnection(TransportConnection connection) { _connection = connection; _currentState = AMQPState.CONNECTION_UNDEFINED; @@ -102,12 +103,14 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * ------------------------------------------- API Methods -------------------------------------------- - */ + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ /** - * Opens the TCP connection and let the formalities begin. - */ + * Opens the TCP connection and let the formalities begin. + */ public ConnectionStartBody openTCPConnection() throws AMQPException { _lock.lock(); @@ -119,15 +122,17 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _phase = _connection.connect(); // waiting for ConnectionStartBody or error in connection - _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, - "The broker didn't send the ConnectionStartBody in time"); + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); _currentState = AMQPState.CONNECTION_NOT_STARTED; return _connectionStartBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error opening connection to broker", e); } finally { @@ -135,25 +140,43 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } } - public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + /** + * The current java broker implementation can send a connection tune body + * as a response to the startOk. Not sure if that is the correct behaviour. + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException { _lock.lock(); try { _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, - AMQPState.CONNECTION_NOT_SECURE); + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); _phase.messageSent(msg); - _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionSecureBody, - "The broker didn't send the ConnectionSecureBody in time"); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; + //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time"); + //_currentState = AMQPState.CONNECTION_NOT_SECURE; + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.startOk", e); } finally { @@ -162,9 +185,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could - * issue a new challenge - */ + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException { _lock.lock(); @@ -173,11 +196,15 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _connectionTuneBody = null; _connectionSecureBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, - _correlationId); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); _phase.messageSent(msg); - _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + if (_connectionTuneBody != null) { _currentState = AMQPState.CONNECTION_NOT_TUNED; @@ -193,9 +220,9 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); } } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.secureOk", e); } finally { @@ -214,10 +241,6 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _phase.messageSent(msg); _currentState = AMQPState.CONNECTION_NOT_OPENED; } - catch (Exception e) - { - throw new AMQPException("XXX"); - } finally { _lock.unlock(); @@ -229,20 +252,26 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen _lock.lock(); try { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + _connectionOpenOkBody = null; checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, - QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); - _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, - "The broker didn't send the ConnectionOpenOkBody in time"); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); _currentState = AMQPState.CONNECTION_OPEN; return _connectionOpenOkBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.open", e); } finally { @@ -257,18 +286,16 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen { _connectionCloseOkBody = null; checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, - QpidConstants.EMPTY_CORRELATION_ID); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); _phase.messageSent(msg); _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, - "The broker didn't send the ConnectionCloseOkBody in time"); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); _currentState = AMQPState.CONNECTION_CLOSED; return _connectionCloseOkBody; } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQPException("XXX"); + throw new AMQPException("Error in connection.close", e); } finally { @@ -277,72 +304,117 @@ public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListen } /** - * ------------------------------------------- AMQMethodListener methods - * -------------------------------------------- - */ + * ------------------------------------------- AMQMethodListener methods + * -------------------------------------------- + */ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException { - _correlationId = evt.getCorrelationId(); + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); - if (evt.getMethod() instanceof ConnectionStartBody) + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody)evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signal(); - return true; + _lock.unlock(); } - else if (evt.getMethod() instanceof ConnectionSecureBody) + } + + private void handleClose() throws AMQPException + { + try + { + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; + throw new AMQPException("Error handling connection.close from broker", e); } - else if (evt.getMethod() instanceof ConnectionTuneBody) + } + + private void checkIfConnectionClosed()throws AMQPException + { + if (_connectionCloseBody != null) { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotTuned.signal(); - return true; + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + + " with reply code (" + _connectionCloseBody.getReplyCode() + ") " + + "caused by class " + _connectionCloseBody.getClassId() + + " and method " + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) + } + + private void releaseLocks() + { + if(_currentState == AMQPState.CONNECTION_NOT_OPENED) { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); _connectionNotOpened.signal(); - return true; } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) + else if(_currentState == AMQPState.CONNECTION_UNDEFINED) { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; + _connectionNotStarted.signal(); } - else if (evt.getMethod() instanceof ConnectionCloseBody) + else if(_currentState == AMQPState.CONNECTION_NOT_STARTED) { - handleClose(); - return true; + _connectionNotSecure.signal(); } - else + else if(_currentState == AMQPState.CONNECTION_NOT_SECURE) { - return false; + _connectionNotTuned.signal(); } } - public void handleClose() throws AMQPException + public Phase getPhasePipe() { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING); - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody - } - catch (Exception e) - { - throw new AMQPException("XXX"); - } - finally - { - _lock.unlock(); - } + return _phase; } }
\ No newline at end of file |