summaryrefslogtreecommitdiff
path: root/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java414
1 files changed, 7 insertions, 407 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
index 1d6541ec3e..99eb690ede 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -1,440 +1,40 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.nclient.amqp;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.util.AMQPValidator;
-/**
- * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
- * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
- * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
- */
-public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
+public interface AMQPConnection
{
- private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
-
- private Phase _phase;
-
- private TransportConnection _connection;
-
- private long _correlationId;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
- AMQPState.CONNECTION_OPEN, };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _connectionNotStarted = _lock.newCondition();
-
- private final Condition _connectionNotSecure = _lock.newCondition();
-
- private final Condition _connectionNotTuned = _lock.newCondition();
-
- private final Condition _connectionNotOpened = _lock.newCondition();
-
- private final Condition _connectionNotClosed = _lock.newCondition();
-
- private ConnectionStartBody _connectionStartBody;
-
- private ConnectionSecureBody _connectionSecureBody;
-
- private ConnectionTuneBody _connectionTuneBody;
-
- private ConnectionOpenOkBody _connectionOpenOkBody;
-
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
-
- protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
- {
- _connection = connection;
- _stateManager = stateManager;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
/**
* Opens the TCP connection and let the formalities begin.
*/
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
- {
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- notifyState(AMQPState.CONNECTION_NOT_STARTED);
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract ConnectionStartBody openTCPConnection() throws AMQPException;
/**
* The current java broker implementation can send a connection tune body
* as a response to the startOk. Not sure if that is the correct behaviour.
*/
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
+ public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException;
/**
* The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
* issue a new challenge
*/
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- notifyState(AMQPState.CONNECTION_NOT_OPENED);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- notifyState(AMQPState.CONNECTION_OPEN);
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- notifyState(AMQPState.CONNECTION_CLOSED);
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- notifyState(AMQPState.CONNECTION_CLOSING);
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("Error handling connection.close from broker", e);
- }
- }
-
- private void checkIfConnectionClosed() throws AMQPException
- {
- if (_connectionCloseBody != null)
- {
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
- + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
- + _connectionCloseBody.getMethod();
+ public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException;
- throw new AMQPException(error);
- }
- }
+ public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException;
- private void releaseLocks()
- {
- if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
- {
- _connectionNotOpened.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
- {
- _connectionNotStarted.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
- {
- _connectionNotSecure.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
- {
- _connectionNotTuned.signal();
- }
- }
+ public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException;
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
- }
+ public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException;
} \ No newline at end of file